diff --git a/api/v1beta1/user_types.go b/api/v1beta1/user_types.go index 0a1cf3d1..40085425 100644 --- a/api/v1beta1/user_types.go +++ b/api/v1beta1/user_types.go @@ -37,6 +37,11 @@ type UserSpec struct { // // Note that this import only occurs at creation time, and is ignored once a password has been set on a User. ImportCredentialsSecret *corev1.LocalObjectReference `json:"importCredentialsSecret,omitempty"` + // Limits to apply to a user to restrict the number of connections and channels + // the user can create. These limits can be used as guard rails in environments + // where applications cannot be trusted and monitored in detail, for example, + // when RabbitMQ clusters are offered as a service. See https://www.rabbitmq.com/docs/user-limits. + UserLimits *UserLimits `json:"limits,omitempty"` } // UserStatus defines the observed state of User. @@ -56,6 +61,17 @@ type UserStatus struct { // +kubebuilder:validation:Enum=management;policymaker;monitoring;administrator type UserTag string +// Limits to apply to a user to restrict the number of connections and channels +// the user can create. These limits can be used as guard rails in environments +// where applications cannot be trusted and monitored in detail, for example, +// when RabbitMQ clusters are offered as a service. See https://www.rabbitmq.com/docs/user-limits. +type UserLimits struct { + // Limits how many connections the user can open. + Connections *int32 `json:"connections,omitempty"` + // Limits how many AMQP 0.9.1 channels the user can open. + Channels *int32 `json:"channels,omitempty"` +} + // +genclient // +kubebuilder:object:root=true // +kubebuilder:resource:categories=rabbitmq diff --git a/api/v1beta1/user_types_test.go b/api/v1beta1/user_types_test.go index 4a416584..eb20c452 100644 --- a/api/v1beta1/user_types_test.go +++ b/api/v1beta1/user_types_test.go @@ -92,4 +92,51 @@ var _ = Describe("user spec", func() { }) }) + When("creating a user with limits", func() { + var user User + var username string + var userLimits UserLimits + var connections, channels int32 + + JustBeforeEach(func() { + user = User{ + ObjectMeta: metav1.ObjectMeta{ + Name: username, + Namespace: namespace, + }, + Spec: UserSpec{ + RabbitmqClusterReference: RabbitmqClusterReference{ + Name: "some-cluster", + }, + UserLimits: &userLimits, + }, + } + }) + + When("creating a user with valid limits", func() { + BeforeEach(func() { + username = "limits-user" + connections = 5 + channels = 10 + userLimits = UserLimits{ + Connections: &connections, + Channels: &channels, + } + }) + It("successfully creates the user", func() { + Expect(k8sClient.Create(ctx, &user)).To(Succeed()) + fetchedUser := &User{} + Expect(k8sClient.Get(ctx, types.NamespacedName{ + Name: user.Name, + Namespace: user.Namespace, + }, fetchedUser)).To(Succeed()) + Expect(fetchedUser.Spec.RabbitmqClusterReference).To(Equal(RabbitmqClusterReference{ + Name: "some-cluster", + })) + Expect(fetchedUser.Spec.UserLimits).NotTo(BeNil()) + Expect(*fetchedUser.Spec.UserLimits.Connections).To(Equal(connections)) + Expect(*fetchedUser.Spec.UserLimits.Channels).To(Equal(channels)) + }) + }) + }) }) diff --git a/api/v1beta1/zz_generated.deepcopy.go b/api/v1beta1/zz_generated.deepcopy.go index f558be68..8b6a3809 100644 --- a/api/v1beta1/zz_generated.deepcopy.go +++ b/api/v1beta1/zz_generated.deepcopy.go @@ -1174,6 +1174,31 @@ func (in *User) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UserLimits) DeepCopyInto(out *UserLimits) { + *out = *in + if in.Connections != nil { + in, out := &in.Connections, &out.Connections + *out = new(int32) + **out = **in + } + if in.Channels != nil { + in, out := &in.Channels, &out.Channels + *out = new(int32) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UserLimits. +func (in *UserLimits) DeepCopy() *UserLimits { + if in == nil { + return nil + } + out := new(UserLimits) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *UserList) DeepCopyInto(out *UserList) { *out = *in @@ -1220,6 +1245,11 @@ func (in *UserSpec) DeepCopyInto(out *UserSpec) { *out = new(v1.LocalObjectReference) **out = **in } + if in.UserLimits != nil { + in, out := &in.UserLimits, &out.UserLimits + *out = new(UserLimits) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UserSpec. diff --git a/config/crd/bases/rabbitmq.com_users.yaml b/config/crd/bases/rabbitmq.com_users.yaml index 447dbd55..d632125a 100644 --- a/config/crd/bases/rabbitmq.com_users.yaml +++ b/config/crd/bases/rabbitmq.com_users.yaml @@ -64,6 +64,23 @@ spec: type: string type: object x-kubernetes-map-type: atomic + limits: + description: |- + Limits to apply to a user to restrict the number of connections and channels + the user can create. These limits can be used as guard rails in environments + where applications cannot be trusted and monitored in detail, for example, + when RabbitMQ clusters are offered as a service. See https://www.rabbitmq.com/docs/user-limits. + properties: + channels: + description: Limits how many AMQP 0.9.1 channels the user can + open. + format: int32 + type: integer + connections: + description: Limits how many connections the user can open. + format: int32 + type: integer + type: object rabbitmqClusterReference: description: |- Reference to the RabbitmqCluster that the user will be created for. This cluster must diff --git a/controllers/binding_controller_test.go b/controllers/binding_controller_test.go index 21fda664..d2c993b6 100644 --- a/controllers/binding_controller_test.go +++ b/controllers/binding_controller_test.go @@ -95,6 +95,10 @@ var _ = Describe("bindingController", func() { }) When("creating a binding", func() { + AfterEach(func() { + Expect(k8sClient.Delete(ctx, &binding)).To(Succeed()) + }) + When("the RabbitMQ Client returns a HTTP error response", func() { BeforeEach(func() { bindingName = "test-binding-http-error" diff --git a/controllers/exchange_controller_test.go b/controllers/exchange_controller_test.go index 3260e4b9..1d9ea72a 100644 --- a/controllers/exchange_controller_test.go +++ b/controllers/exchange_controller_test.go @@ -97,6 +97,10 @@ var _ = Describe("exchange-controller", func() { }) Context("creation", func() { + AfterEach(func() { + Expect(k8sClient.Delete(ctx, &exchange)).To(Succeed()) + }) + When("the RabbitMQ Client returns a HTTP error response", func() { BeforeEach(func() { exchangeName = "test-http-error" @@ -150,6 +154,7 @@ var _ = Describe("exchange-controller", func() { }) }) }) + Context("LastTransitionTime", func() { BeforeEach(func() { exchangeName = "test-last-transition-time" @@ -158,6 +163,11 @@ var _ = Describe("exchange-controller", func() { StatusCode: http.StatusCreated, }, nil) }) + + AfterEach(func() { + Expect(k8sClient.Delete(ctx, &exchange)).To(Succeed()) + }) + It("changes only if status changes", func() { By("setting LastTransitionTime when transitioning to status Ready=true") Expect(k8sClient.Create(ctx, &exchange)).To(Succeed()) diff --git a/controllers/federation_controller_test.go b/controllers/federation_controller_test.go index de9f5a17..6bb6a399 100644 --- a/controllers/federation_controller_test.go +++ b/controllers/federation_controller_test.go @@ -98,6 +98,10 @@ var _ = Describe("federation-controller", func() { }) When("creation", func() { + AfterEach(func() { + Expect(k8sClient.Delete(ctx, &federation)).To(Succeed()) + }) + When("the RabbitMQ Client returns a HTTP error response", func() { BeforeEach(func() { federationName = "test-federation-http-error" diff --git a/controllers/operatorpolicy_controller_test.go b/controllers/operatorpolicy_controller_test.go index 3055c0e2..1a27f191 100644 --- a/controllers/operatorpolicy_controller_test.go +++ b/controllers/operatorpolicy_controller_test.go @@ -101,6 +101,10 @@ var _ = Describe("operatorpolicy-controller", func() { }) Context("creation", func() { + AfterEach(func() { + Expect(k8sClient.Delete(ctx, &policy)).To(Succeed()) + }) + When("the RabbitMQ Client returns a HTTP error response", func() { BeforeEach(func() { policyName = "test-http-error" diff --git a/controllers/permission_controller_test.go b/controllers/permission_controller_test.go index e8d1b86f..24f43cf1 100644 --- a/controllers/permission_controller_test.go +++ b/controllers/permission_controller_test.go @@ -101,6 +101,10 @@ var _ = Describe("permission-controller", func() { }) Context("creation", func() { + AfterEach(func() { + Expect(k8sClient.Delete(ctx, &permission)).To(Succeed()) + }) + When("the RabbitMQ Client returns a HTTP error response", func() { BeforeEach(func() { permissionName = "test-with-username-http-error" @@ -261,9 +265,21 @@ var _ = Describe("permission-controller", func() { Status: "204 No Content", StatusCode: http.StatusNoContent, }, nil) + fakeRabbitMQClient.PutUserLimitsReturns(&http.Response{ + Status: "201 Created", + StatusCode: http.StatusCreated, + }, nil) + fakeRabbitMQClient.DeleteUserLimitsReturns(&http.Response{ + Status: "204 No Content", + StatusCode: http.StatusNoContent, + }, nil) }) Context("creation", func() { + AfterEach(func() { + Expect(k8sClient.Delete(ctx, &permission)).To(Succeed()) + }) + When("user not exist", func() { BeforeEach(func() { permissionName = "test-with-userref-create-not-exist" @@ -407,12 +423,16 @@ var _ = Describe("permission-controller", func() { }) }) - Context("ownerref", func() { + When("the user already exists", func() { BeforeEach(func() { permissionName = "ownerref-with-userref-test" userName = "example-ownerref" }) + AfterEach(func() { + Expect(k8sClient.Delete(ctx, &user)).To(Succeed()) + }) + It("sets the correct deletion ownerref to the object", func() { Expect(k8sClient.Create(ctx, &user)).To(Succeed()) user.Status.Username = userName diff --git a/controllers/policy_controller_test.go b/controllers/policy_controller_test.go index c3938c18..2cc75262 100644 --- a/controllers/policy_controller_test.go +++ b/controllers/policy_controller_test.go @@ -100,6 +100,10 @@ var _ = Describe("policy-controller", func() { }) Context("creation", func() { + AfterEach(func() { + Expect(k8sClient.Delete(ctx, &policy)).To(Succeed()) + }) + When("the RabbitMQ Client returns a HTTP error response", func() { BeforeEach(func() { policyName = "test-http-error" diff --git a/controllers/queue_controller_test.go b/controllers/queue_controller_test.go index 9cad0525..adb26c4a 100644 --- a/controllers/queue_controller_test.go +++ b/controllers/queue_controller_test.go @@ -96,6 +96,10 @@ var _ = Describe("queue-controller", func() { }) Context("creation", func() { + AfterEach(func() { + Expect(k8sClient.Delete(ctx, &queue)).To(Succeed()) + }) + When("the RabbitMQ Client returns a HTTP error response", func() { BeforeEach(func() { queueName = "test-http-error" diff --git a/controllers/schemareplication_controller_test.go b/controllers/schemareplication_controller_test.go index 073a190f..c2bbda23 100644 --- a/controllers/schemareplication_controller_test.go +++ b/controllers/schemareplication_controller_test.go @@ -103,6 +103,10 @@ var _ = Describe("schema-replication-controller", func() { }) When("creation", func() { + AfterEach(func() { + Expect(k8sClient.Delete(ctx, &replication)).To(Succeed()) + }) + When("the RabbitMQ Client returns a HTTP error response", func() { BeforeEach(func() { replicationName = "test-replication-http-error" @@ -257,6 +261,7 @@ var _ = Describe("schema-replication-controller", func() { }) AfterEach(func() { + Expect(k8sClient.Delete(ctx, &replication)).To(Succeed()) rabbitmqclient.SecretStoreClientProvider = rabbitmqclient.GetSecretStoreClient }) diff --git a/controllers/shovel_controller_test.go b/controllers/shovel_controller_test.go index df18a61c..8eeb7d74 100644 --- a/controllers/shovel_controller_test.go +++ b/controllers/shovel_controller_test.go @@ -100,6 +100,10 @@ var _ = Describe("shovel-controller", func() { }) When("creation", func() { + AfterEach(func() { + Expect(k8sClient.Delete(ctx, &shovel)).To(Succeed()) + }) + When("the RabbitMQ Client returns a HTTP error response", func() { BeforeEach(func() { shovelName = "test-shovel-http-error" diff --git a/controllers/super_stream_controller_test.go b/controllers/super_stream_controller_test.go index 784454e0..aa6e96b7 100644 --- a/controllers/super_stream_controller_test.go +++ b/controllers/super_stream_controller_test.go @@ -124,6 +124,10 @@ var _ = Describe("super-stream-controller", func() { }) Context("creation", func() { + AfterEach(func() { + Expect(k8sClient.Delete(ctx, &superStream)).To(Succeed()) + }) + When("an underlying resource is deleted", func() { JustBeforeEach(func() { Expect(k8sClient.Create(ctx, &superStream)).To(Succeed()) diff --git a/controllers/topicpermission_controller_test.go b/controllers/topicpermission_controller_test.go index 3c956639..fb229a2c 100644 --- a/controllers/topicpermission_controller_test.go +++ b/controllers/topicpermission_controller_test.go @@ -103,6 +103,10 @@ var _ = Describe("topicpermission-controller", func() { }) Context("creation", func() { + AfterEach(func() { + Expect(k8sClient.Delete(ctx, &topicperm)).To(Succeed()) + }) + When("the RabbitMQ Client returns a HTTP error response", func() { BeforeEach(func() { name = "test-with-username-http-error" @@ -284,6 +288,10 @@ var _ = Describe("topicpermission-controller", func() { }) Context("creation", func() { + AfterEach(func() { + Expect(k8sClient.Delete(ctx, &topicperm)).To(Succeed()) + }) + When("user not exist", func() { BeforeEach(func() { name = "test-with-userref-create-not-exist" @@ -457,6 +465,11 @@ var _ = Describe("topicpermission-controller", func() { userName = "topic-perm-topic-perm-user" }) + AfterEach(func() { + Expect(k8sClient.Delete(ctx, &user)).To(Succeed()) + Expect(k8sClient.Delete(ctx, &topicperm)).To(Succeed()) + }) + It("sets the correct deletion ownerref to the object", func() { Expect(k8sClient.Create(ctx, &user)).To(Succeed()) user.Status.Username = userName diff --git a/controllers/user_controller.go b/controllers/user_controller.go index 8c7eb340..9d045148 100644 --- a/controllers/user_controller.go +++ b/controllers/user_controller.go @@ -14,6 +14,7 @@ import ( "errors" "fmt" + rabbithole "github.com/michaelklishin/rabbit-hole/v3" topology "github.com/rabbitmq/messaging-topology-operator/api/v1beta1" "github.com/rabbitmq/messaging-topology-operator/internal" "github.com/rabbitmq/messaging-topology-operator/rabbitmqclient" @@ -221,7 +222,55 @@ func (r *UserReconciler) DeclareFunc(ctx context.Context, client rabbitmqclient. } logger.Info("Generated user settings", "user", user.Name, "settings", userSettings) - return validateResponse(client.PutUser(userSettings.Name, userSettings)) + err = validateResponse(client.PutUser(userSettings.Name, userSettings)) + if err != nil { + return err + } + + newUserLimits := internal.GenerateUserLimits(user.Spec.UserLimits) + logger.Info("Getting existing user limits", "user", user.Name) + existingUserLimits, err := r.getUserLimits(client, string(credentials.Data["username"])) + if err != nil { + return err + } + limitsToDelete := r.userLimitsToDelete(existingUserLimits, newUserLimits) + if len(limitsToDelete) > 0 { + logger.Info("Deleting outdated user limits", "user", user.Name, "limits", limitsToDelete) + err = validateResponseForDeletion(client.DeleteUserLimits(string(credentials.Data["username"]), limitsToDelete)) + if err != nil && !errors.Is(err, NotFound) { + return err + } + } + if len(newUserLimits) > 0 { + logger.Info("Creating new user limits", "user", user.Name, "limits", newUserLimits) + return validateResponse(client.PutUserLimits(string(credentials.Data["username"]), newUserLimits)) + } + return nil +} + +func (r *UserReconciler) userLimitsToDelete(existingUserLimits, newUserLimits rabbithole.UserLimitsValues) (limitsToDelete rabbithole.UserLimits) { + userLimitKeys := []string{"max-connections", "max-channels"} + for _, limit := range userLimitKeys { + _, oldExists := existingUserLimits[limit] + _, newExists := newUserLimits[limit] + if oldExists && !newExists { + limitsToDelete = append(limitsToDelete, limit) + } + } + return limitsToDelete +} + +func (r *UserReconciler) getUserLimits(client rabbitmqclient.Client, username string) (rabbithole.UserLimitsValues, error) { + userLimitsInfo, err := client.GetUserLimits(username) + if errors.Is(err, error(rabbithole404)) { + return rabbithole.UserLimitsValues{}, nil + } else if err != nil { + return rabbithole.UserLimitsValues{}, err + } + if len(userLimitsInfo) == 0 { + return rabbithole.UserLimitsValues{}, nil + } + return userLimitsInfo[0].Value, nil } func (r *UserReconciler) getUserCredentials(ctx context.Context, user *topology.User) (*corev1.Secret, error) { diff --git a/controllers/user_controller_test.go b/controllers/user_controller_test.go index 5a432dc8..e97519b9 100644 --- a/controllers/user_controller_test.go +++ b/controllers/user_controller_test.go @@ -15,6 +15,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/config" "sigs.k8s.io/controller-runtime/pkg/metrics/server" + rabbithole "github.com/michaelklishin/rabbit-hole/v3" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" . "github.com/onsi/gomega/gstruct" @@ -33,6 +34,9 @@ var _ = Describe("UserController", func() { managerCtx context.Context managerCancel context.CancelFunc k8sClient runtimeClient.Client + userLimits topology.UserLimits + connections int32 + channels int32 ) BeforeEach(func() { @@ -91,11 +95,16 @@ var _ = Describe("UserController", func() { RabbitmqClusterReference: topology.RabbitmqClusterReference{ Name: "example-rabbit", }, + UserLimits: &userLimits, }, } }) When("creating a user", func() { + AfterEach(func() { + Expect(k8sClient.Delete(ctx, &user)).To(Succeed()) + }) + When("the RabbitMQ Client returns a HTTP error response", func() { BeforeEach(func() { userName = "test-user-http-error" @@ -154,6 +163,116 @@ var _ = Describe("UserController", func() { }))) }) }) + + Context("user limits", func() { + When("the user has limits defined", func() { + BeforeEach(func() { + userName = "test-user-limits" + connections = 5 + channels = 10 + userLimits = topology.UserLimits{ + Connections: &connections, + Channels: &channels, + } + fakeRabbitMQClient.PutUserReturns(&http.Response{ + Status: "201 Created", + StatusCode: http.StatusCreated, + }, nil) + fakeRabbitMQClient.PutUserLimitsReturns(&http.Response{ + Status: "201 Created", + StatusCode: http.StatusCreated, + }, nil) + fakeRabbitMQClient.GetUserLimitsReturns(nil, rabbithole.ErrorResponse{ + StatusCode: 404, + Message: "Object Not Found", + Reason: "Not Found", + }) + }) + + It("should create the user limits", func() { + Expect(k8sClient.Create(ctx, &user)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( + ctx, + types.NamespacedName{Name: user.Name, Namespace: user.Namespace}, + &user, + ) + + return user.Status.Conditions + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("SuccessfulCreateOrUpdate"), + "Status": Equal(corev1.ConditionTrue), + }))) + By("calling PutUserLimits with the correct user limits") + Expect(fakeRabbitMQClient.PutUserLimitsCallCount()).To(BeNumerically(">", 0)) + _, userLimitsValues := fakeRabbitMQClient.PutUserLimitsArgsForCall(0) + Expect(userLimitsValues).To(HaveKeyWithValue("max-connections", int(connections))) + Expect(userLimitsValues).To(HaveKeyWithValue("max-channels", (int(channels)))) + }) + }) + + When("the user already has existing limits that differ from the new limits", func() { + BeforeEach(func() { + userName = "test-changed-user-limits" + connections = 5 + userLimits = topology.UserLimits{ + Connections: &connections, + Channels: nil, + } + var userLimitsInfo []rabbithole.UserLimitsInfo + userLimitsInfo = append(userLimitsInfo, rabbithole.UserLimitsInfo{ + User: userName, + Value: rabbithole.UserLimitsValues{"max-channels": 10, "max-connections": 3}, + }) + fakeRabbitMQClient.PutUserReturns(&http.Response{ + Status: "201 Created", + StatusCode: http.StatusCreated, + }, nil) + fakeRabbitMQClient.PutUserLimitsReturns(&http.Response{ + Status: "201 Created", + StatusCode: http.StatusCreated, + }, nil) + fakeRabbitMQClient.GetUserLimitsReturns(userLimitsInfo, nil) + fakeRabbitMQClient.DeleteUserLimitsReturns(&http.Response{ + Status: "204 No Content", + StatusCode: http.StatusNoContent, + }, nil) + }) + + It("should update the existing user limit and delete the unused old limit", func() { + Expect(k8sClient.Create(ctx, &user)).To(Succeed()) + Eventually(func() []topology.Condition { + _ = k8sClient.Get( + ctx, + types.NamespacedName{Name: user.Name, Namespace: user.Namespace}, + &user, + ) + + return user.Status.Conditions + }). + Within(statusEventsUpdateTimeout). + WithPolling(time.Second). + Should(ContainElement(MatchFields(IgnoreExtras, Fields{ + "Type": Equal(topology.ConditionType("Ready")), + "Reason": Equal("SuccessfulCreateOrUpdate"), + "Status": Equal(corev1.ConditionTrue), + }))) + By("calling DeleteUserLimits with the unused old user limits") + Expect(fakeRabbitMQClient.DeleteUserLimitsCallCount()).To(BeNumerically(">", 0)) + _, userLimits := fakeRabbitMQClient.DeleteUserLimitsArgsForCall(0) + Expect(userLimits).To(HaveLen(1)) + Expect(userLimits).To(ContainElement("max-channels")) + By("calling PutUserLimits with the correct new user limits") + Expect(fakeRabbitMQClient.PutUserLimitsCallCount()).To(BeNumerically(">", 0)) + _, userLimitsValues := fakeRabbitMQClient.PutUserLimitsArgsForCall(0) + Expect(userLimitsValues).To(HaveKeyWithValue("max-connections", int(connections))) + }) + }) + }) }) When("deleting a user", func() { @@ -162,6 +281,14 @@ var _ = Describe("UserController", func() { Status: "201 Created", StatusCode: http.StatusCreated, }, nil) + fakeRabbitMQClient.PutUserLimitsReturns(&http.Response{ + Status: "201 Created", + StatusCode: http.StatusCreated, + }, nil) + fakeRabbitMQClient.DeleteUserLimitsReturns(&http.Response{ + Status: "204 No Content", + StatusCode: http.StatusNoContent, + }, nil) Expect(k8sClient.Create(ctx, &user)).To(Succeed()) Eventually(func() []topology.Condition { _ = k8sClient.Get( diff --git a/controllers/utils.go b/controllers/utils.go index 14015047..c12717e7 100644 --- a/controllers/utils.go +++ b/controllers/utils.go @@ -18,6 +18,7 @@ import ( "strings" "time" + rabbithole "github.com/michaelklishin/rabbit-hole/v3" topology "github.com/rabbitmq/messaging-topology-operator/api/v1beta1" "github.com/rabbitmq/messaging-topology-operator/rabbitmqclient" "k8s.io/client-go/tools/record" @@ -30,6 +31,13 @@ import ( corev1 "k8s.io/api/core/v1" ) +// returned in some cases as an error when rabbithole encounters a 404 response +var rabbithole404 = rabbithole.ErrorResponse{ + StatusCode: 404, + Message: "Object Not Found", + Reason: "Not Found", +} + // TODO: check possible status code response from RabbitMQ // validate status code above 300 might not be all failure case func validateResponse(res *http.Response, err error) error { diff --git a/controllers/vhost_controller_test.go b/controllers/vhost_controller_test.go index c5132655..80fdc4a1 100644 --- a/controllers/vhost_controller_test.go +++ b/controllers/vhost_controller_test.go @@ -95,6 +95,10 @@ var _ = Describe("vhost-controller", func() { }) Context("creation", func() { + AfterEach(func() { + Expect(k8sClient.Delete(ctx, &vhost)).To(Succeed()) + }) + When("the RabbitMQ Client returns a HTTP error response", func() { BeforeEach(func() { vhostName = "test-http-error" diff --git a/docs/api/rabbitmq.com.ref.asciidoc b/docs/api/rabbitmq.com.ref.asciidoc index b24cdb41..d0277f13 100644 --- a/docs/api/rabbitmq.com.ref.asciidoc +++ b/docs/api/rabbitmq.com.ref.asciidoc @@ -1381,6 +1381,27 @@ More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api- |=== +[id="{anchor_prefix}-github-com-rabbitmq-messaging-topology-operator-api-v1beta1-userlimits"] +==== UserLimits + +Limits to apply to a user to restrict the number of connections and channels +the user can create. These limits can be used as guard rails in environments +where applications cannot be trusted and monitored in detail, for example, +when RabbitMQ clusters are offered as a service. See https://www.rabbitmq.com/docs/user-limits. + +.Appears In: +**** +- xref:{anchor_prefix}-github-com-rabbitmq-messaging-topology-operator-api-v1beta1-userspec[$$UserSpec$$] +**** + +[cols="25a,75a", options="header"] +|=== +| Field | Description +| *`connections`* __integer__ | Limits how many connections the user can open. +| *`channels`* __integer__ | Limits how many AMQP 0.9.1 channels the user can open. +|=== + + [id="{anchor_prefix}-github-com-rabbitmq-messaging-topology-operator-api-v1beta1-userlist"] ==== UserList @@ -1437,6 +1458,10 @@ password will be generated. The Secret must have the following keys in its Data * `password` – Plain-text password. Will be used only if the `passwordHash` key is missing. Note that this import only occurs at creation time, and is ignored once a password has been set on a User. +| *`limits`* __xref:{anchor_prefix}-github-com-rabbitmq-messaging-topology-operator-api-v1beta1-userlimits[$$UserLimits$$]__ | Limits to apply to a user to restrict the number of connections and channels +the user can create. These limits can be used as guard rails in environments +where applications cannot be trusted and monitored in detail, for example, +when RabbitMQ clusters are offered as a service. See https://www.rabbitmq.com/docs/user-limits. |=== diff --git a/internal/user.go b/internal/user.go index 4d745eaf..88c864e6 100644 --- a/internal/user.go +++ b/internal/user.go @@ -62,3 +62,16 @@ func GenerateUserSettings(credentials *corev1.Secret, tags []topology.UserTag) ( HashingAlgorithm: rabbithole.HashingAlgorithmSHA512, }, nil } + +func GenerateUserLimits(userLimits *topology.UserLimits) rabbithole.UserLimitsValues { + userLimitsValues := rabbithole.UserLimitsValues{} + if userLimits != nil { + if userLimits.Connections != nil { + userLimitsValues["max-connections"] = int(*userLimits.Connections) + } + if userLimits.Channels != nil { + userLimitsValues["max-channels"] = int(*userLimits.Channels) + } + } + return userLimitsValues +} diff --git a/internal/user_test.go b/internal/user_test.go index 0c35aa77..f1874719 100644 --- a/internal/user_test.go +++ b/internal/user_test.go @@ -63,4 +63,36 @@ var _ = Describe("GenerateUserSettings", func() { // Password should not be sent, even if provided Expect(settings.Password).To(BeEmpty()) }) + + When("user limits are provided", func() { + var connections, channels int32 + + It("uses the limits to generate the expected rabbithole.UserLimits", func() { + connections = 3 + channels = 7 + limits := internal.GenerateUserLimits(&topology.UserLimits{ + Connections: &connections, + Channels: &channels, + }) + Expect(limits).To(HaveLen(2)) + Expect(limits).To(HaveKeyWithValue("max-connections", int(connections))) + Expect(limits).To(HaveKeyWithValue("max-channels", int(channels))) + }) + + It("does not create unspecified limits", func() { + connections = 5 + limits := internal.GenerateUserLimits(&topology.UserLimits{ + Connections: &connections, + }) + Expect(limits).To(HaveKeyWithValue("max-connections", int(connections))) + Expect(limits).NotTo(HaveKey("max-channels")) + }) + }) + + When("no user limits are provided", func() { + It("does not specify limits", func() { + limits := internal.GenerateUserLimits(&topology.UserLimits{}) + Expect(limits).To(BeEmpty()) + }) + }) }) diff --git a/rabbitmqclient/rabbitmq_client_factory.go b/rabbitmqclient/rabbitmq_client_factory.go index 6596fdb8..db1f368a 100644 --- a/rabbitmqclient/rabbitmq_client_factory.go +++ b/rabbitmqclient/rabbitmq_client_factory.go @@ -22,6 +22,9 @@ import ( type Client interface { PutUser(string, rabbithole.UserSettings) (*http.Response, error) DeleteUser(string) (*http.Response, error) + PutUserLimits(string, rabbithole.UserLimitsValues) (*http.Response, error) + GetUserLimits(string) ([]rabbithole.UserLimitsInfo, error) + DeleteUserLimits(string, rabbithole.UserLimits) (*http.Response, error) DeclareBinding(string, rabbithole.BindingInfo) (*http.Response, error) DeleteBinding(string, rabbithole.BindingInfo) (*http.Response, error) ListQueueBindingsBetween(string, string, string) ([]rabbithole.BindingInfo, error) diff --git a/rabbitmqclient/rabbitmqclientfakes/fake_client.go b/rabbitmqclient/rabbitmqclientfakes/fake_client.go index c8a7aed9..2fe437b0 100644 --- a/rabbitmqclient/rabbitmqclientfakes/fake_client.go +++ b/rabbitmqclient/rabbitmqclientfakes/fake_client.go @@ -223,6 +223,20 @@ type FakeClient struct { result1 *http.Response result2 error } + DeleteUserLimitsStub func(string, rabbithole.UserLimits) (*http.Response, error) + deleteUserLimitsMutex sync.RWMutex + deleteUserLimitsArgsForCall []struct { + arg1 string + arg2 rabbithole.UserLimits + } + deleteUserLimitsReturns struct { + result1 *http.Response + result2 error + } + deleteUserLimitsReturnsOnCall map[int]struct { + result1 *http.Response + result2 error + } DeleteVhostStub func(string) (*http.Response, error) deleteVhostMutex sync.RWMutex deleteVhostArgsForCall []struct { @@ -250,6 +264,19 @@ type FakeClient struct { result1 *rabbithole.DetailedQueueInfo result2 error } + GetUserLimitsStub func(string) ([]rabbithole.UserLimitsInfo, error) + getUserLimitsMutex sync.RWMutex + getUserLimitsArgsForCall []struct { + arg1 string + } + getUserLimitsReturns struct { + result1 []rabbithole.UserLimitsInfo + result2 error + } + getUserLimitsReturnsOnCall map[int]struct { + result1 []rabbithole.UserLimitsInfo + result2 error + } GetVhostStub func(string) (*rabbithole.VhostInfo, error) getVhostMutex sync.RWMutex getVhostArgsForCall []struct { @@ -366,6 +393,20 @@ type FakeClient struct { result1 *http.Response result2 error } + PutUserLimitsStub func(string, rabbithole.UserLimitsValues) (*http.Response, error) + putUserLimitsMutex sync.RWMutex + putUserLimitsArgsForCall []struct { + arg1 string + arg2 rabbithole.UserLimitsValues + } + putUserLimitsReturns struct { + result1 *http.Response + result2 error + } + putUserLimitsReturnsOnCall map[int]struct { + result1 *http.Response + result2 error + } PutVhostStub func(string, rabbithole.VhostSettings) (*http.Response, error) putVhostMutex sync.RWMutex putVhostArgsForCall []struct { @@ -1392,6 +1433,71 @@ func (fake *FakeClient) DeleteUserReturnsOnCall(i int, result1 *http.Response, r }{result1, result2} } +func (fake *FakeClient) DeleteUserLimits(arg1 string, arg2 rabbithole.UserLimits) (*http.Response, error) { + fake.deleteUserLimitsMutex.Lock() + ret, specificReturn := fake.deleteUserLimitsReturnsOnCall[len(fake.deleteUserLimitsArgsForCall)] + fake.deleteUserLimitsArgsForCall = append(fake.deleteUserLimitsArgsForCall, struct { + arg1 string + arg2 rabbithole.UserLimits + }{arg1, arg2}) + stub := fake.DeleteUserLimitsStub + fakeReturns := fake.deleteUserLimitsReturns + fake.recordInvocation("DeleteUserLimits", []interface{}{arg1, arg2}) + fake.deleteUserLimitsMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeClient) DeleteUserLimitsCallCount() int { + fake.deleteUserLimitsMutex.RLock() + defer fake.deleteUserLimitsMutex.RUnlock() + return len(fake.deleteUserLimitsArgsForCall) +} + +func (fake *FakeClient) DeleteUserLimitsCalls(stub func(string, rabbithole.UserLimits) (*http.Response, error)) { + fake.deleteUserLimitsMutex.Lock() + defer fake.deleteUserLimitsMutex.Unlock() + fake.DeleteUserLimitsStub = stub +} + +func (fake *FakeClient) DeleteUserLimitsArgsForCall(i int) (string, rabbithole.UserLimits) { + fake.deleteUserLimitsMutex.RLock() + defer fake.deleteUserLimitsMutex.RUnlock() + argsForCall := fake.deleteUserLimitsArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeClient) DeleteUserLimitsReturns(result1 *http.Response, result2 error) { + fake.deleteUserLimitsMutex.Lock() + defer fake.deleteUserLimitsMutex.Unlock() + fake.DeleteUserLimitsStub = nil + fake.deleteUserLimitsReturns = struct { + result1 *http.Response + result2 error + }{result1, result2} +} + +func (fake *FakeClient) DeleteUserLimitsReturnsOnCall(i int, result1 *http.Response, result2 error) { + fake.deleteUserLimitsMutex.Lock() + defer fake.deleteUserLimitsMutex.Unlock() + fake.DeleteUserLimitsStub = nil + if fake.deleteUserLimitsReturnsOnCall == nil { + fake.deleteUserLimitsReturnsOnCall = make(map[int]struct { + result1 *http.Response + result2 error + }) + } + fake.deleteUserLimitsReturnsOnCall[i] = struct { + result1 *http.Response + result2 error + }{result1, result2} +} + func (fake *FakeClient) DeleteVhost(arg1 string) (*http.Response, error) { fake.deleteVhostMutex.Lock() ret, specificReturn := fake.deleteVhostReturnsOnCall[len(fake.deleteVhostArgsForCall)] @@ -1521,6 +1627,70 @@ func (fake *FakeClient) GetQueueReturnsOnCall(i int, result1 *rabbithole.Detaile }{result1, result2} } +func (fake *FakeClient) GetUserLimits(arg1 string) ([]rabbithole.UserLimitsInfo, error) { + fake.getUserLimitsMutex.Lock() + ret, specificReturn := fake.getUserLimitsReturnsOnCall[len(fake.getUserLimitsArgsForCall)] + fake.getUserLimitsArgsForCall = append(fake.getUserLimitsArgsForCall, struct { + arg1 string + }{arg1}) + stub := fake.GetUserLimitsStub + fakeReturns := fake.getUserLimitsReturns + fake.recordInvocation("GetUserLimits", []interface{}{arg1}) + fake.getUserLimitsMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeClient) GetUserLimitsCallCount() int { + fake.getUserLimitsMutex.RLock() + defer fake.getUserLimitsMutex.RUnlock() + return len(fake.getUserLimitsArgsForCall) +} + +func (fake *FakeClient) GetUserLimitsCalls(stub func(string) ([]rabbithole.UserLimitsInfo, error)) { + fake.getUserLimitsMutex.Lock() + defer fake.getUserLimitsMutex.Unlock() + fake.GetUserLimitsStub = stub +} + +func (fake *FakeClient) GetUserLimitsArgsForCall(i int) string { + fake.getUserLimitsMutex.RLock() + defer fake.getUserLimitsMutex.RUnlock() + argsForCall := fake.getUserLimitsArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeClient) GetUserLimitsReturns(result1 []rabbithole.UserLimitsInfo, result2 error) { + fake.getUserLimitsMutex.Lock() + defer fake.getUserLimitsMutex.Unlock() + fake.GetUserLimitsStub = nil + fake.getUserLimitsReturns = struct { + result1 []rabbithole.UserLimitsInfo + result2 error + }{result1, result2} +} + +func (fake *FakeClient) GetUserLimitsReturnsOnCall(i int, result1 []rabbithole.UserLimitsInfo, result2 error) { + fake.getUserLimitsMutex.Lock() + defer fake.getUserLimitsMutex.Unlock() + fake.GetUserLimitsStub = nil + if fake.getUserLimitsReturnsOnCall == nil { + fake.getUserLimitsReturnsOnCall = make(map[int]struct { + result1 []rabbithole.UserLimitsInfo + result2 error + }) + } + fake.getUserLimitsReturnsOnCall[i] = struct { + result1 []rabbithole.UserLimitsInfo + result2 error + }{result1, result2} +} + func (fake *FakeClient) GetVhost(arg1 string) (*rabbithole.VhostInfo, error) { fake.getVhostMutex.Lock() ret, specificReturn := fake.getVhostReturnsOnCall[len(fake.getVhostArgsForCall)] @@ -2045,6 +2215,71 @@ func (fake *FakeClient) PutUserReturnsOnCall(i int, result1 *http.Response, resu }{result1, result2} } +func (fake *FakeClient) PutUserLimits(arg1 string, arg2 rabbithole.UserLimitsValues) (*http.Response, error) { + fake.putUserLimitsMutex.Lock() + ret, specificReturn := fake.putUserLimitsReturnsOnCall[len(fake.putUserLimitsArgsForCall)] + fake.putUserLimitsArgsForCall = append(fake.putUserLimitsArgsForCall, struct { + arg1 string + arg2 rabbithole.UserLimitsValues + }{arg1, arg2}) + stub := fake.PutUserLimitsStub + fakeReturns := fake.putUserLimitsReturns + fake.recordInvocation("PutUserLimits", []interface{}{arg1, arg2}) + fake.putUserLimitsMutex.Unlock() + if stub != nil { + return stub(arg1, arg2) + } + if specificReturn { + return ret.result1, ret.result2 + } + return fakeReturns.result1, fakeReturns.result2 +} + +func (fake *FakeClient) PutUserLimitsCallCount() int { + fake.putUserLimitsMutex.RLock() + defer fake.putUserLimitsMutex.RUnlock() + return len(fake.putUserLimitsArgsForCall) +} + +func (fake *FakeClient) PutUserLimitsCalls(stub func(string, rabbithole.UserLimitsValues) (*http.Response, error)) { + fake.putUserLimitsMutex.Lock() + defer fake.putUserLimitsMutex.Unlock() + fake.PutUserLimitsStub = stub +} + +func (fake *FakeClient) PutUserLimitsArgsForCall(i int) (string, rabbithole.UserLimitsValues) { + fake.putUserLimitsMutex.RLock() + defer fake.putUserLimitsMutex.RUnlock() + argsForCall := fake.putUserLimitsArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2 +} + +func (fake *FakeClient) PutUserLimitsReturns(result1 *http.Response, result2 error) { + fake.putUserLimitsMutex.Lock() + defer fake.putUserLimitsMutex.Unlock() + fake.PutUserLimitsStub = nil + fake.putUserLimitsReturns = struct { + result1 *http.Response + result2 error + }{result1, result2} +} + +func (fake *FakeClient) PutUserLimitsReturnsOnCall(i int, result1 *http.Response, result2 error) { + fake.putUserLimitsMutex.Lock() + defer fake.putUserLimitsMutex.Unlock() + fake.PutUserLimitsStub = nil + if fake.putUserLimitsReturnsOnCall == nil { + fake.putUserLimitsReturnsOnCall = make(map[int]struct { + result1 *http.Response + result2 error + }) + } + fake.putUserLimitsReturnsOnCall[i] = struct { + result1 *http.Response + result2 error + }{result1, result2} +} + func (fake *FakeClient) PutVhost(arg1 string, arg2 rabbithole.VhostSettings) (*http.Response, error) { fake.putVhostMutex.Lock() ret, specificReturn := fake.putVhostReturnsOnCall[len(fake.putVhostArgsForCall)] @@ -2275,10 +2510,14 @@ func (fake *FakeClient) Invocations() map[string][][]interface{} { defer fake.deleteTopicPermissionsInMutex.RUnlock() fake.deleteUserMutex.RLock() defer fake.deleteUserMutex.RUnlock() + fake.deleteUserLimitsMutex.RLock() + defer fake.deleteUserLimitsMutex.RUnlock() fake.deleteVhostMutex.RLock() defer fake.deleteVhostMutex.RUnlock() fake.getQueueMutex.RLock() defer fake.getQueueMutex.RUnlock() + fake.getUserLimitsMutex.RLock() + defer fake.getUserLimitsMutex.RUnlock() fake.getVhostMutex.RLock() defer fake.getVhostMutex.RUnlock() fake.listExchangeBindingsBetweenMutex.RLock() @@ -2295,6 +2534,8 @@ func (fake *FakeClient) Invocations() map[string][][]interface{} { defer fake.putPolicyMutex.RUnlock() fake.putUserMutex.RLock() defer fake.putUserMutex.RUnlock() + fake.putUserLimitsMutex.RLock() + defer fake.putUserLimitsMutex.RUnlock() fake.putVhostMutex.RLock() defer fake.putVhostMutex.RUnlock() fake.updatePermissionsInMutex.RLock() diff --git a/system_tests/exchange_system_test.go b/system_tests/exchange_system_test.go index 540bddf8..c15f6211 100644 --- a/system_tests/exchange_system_test.go +++ b/system_tests/exchange_system_test.go @@ -44,6 +44,10 @@ var _ = Describe("Exchange", func() { } }) + AfterEach(func() { + _ = k8sClient.Delete(ctx, exchange) + }) + It("declares and deletes a exchange successfully", func() { By("declaring exchange") Expect(k8sClient.Create(ctx, exchange, &client.CreateOptions{})).To(Succeed()) diff --git a/system_tests/operatorpolicy_system_test.go b/system_tests/operatorpolicy_system_test.go index 27f74950..326ed1d5 100644 --- a/system_tests/operatorpolicy_system_test.go +++ b/system_tests/operatorpolicy_system_test.go @@ -44,6 +44,10 @@ var _ = Describe("OperatorPolicy", func() { } }) + AfterEach(func() { + _ = k8sClient.Delete(ctx, policy) + }) + It("creates, updates and deletes an operator policy successfully", func() { By("creating operator policy") Expect(k8sClient.Create(ctx, policy, &client.CreateOptions{})).To(Succeed()) diff --git a/system_tests/policy_system_test.go b/system_tests/policy_system_test.go index a730f556..42312f8d 100644 --- a/system_tests/policy_system_test.go +++ b/system_tests/policy_system_test.go @@ -43,6 +43,10 @@ var _ = Describe("Policy", func() { } }) + AfterEach(func() { + _ = k8sClient.Delete(ctx, policy) + }) + It("creates, updates and deletes a policy successfully", func() { By("creating policy") Expect(k8sClient.Create(ctx, policy, &client.CreateOptions{})).To(Succeed()) diff --git a/system_tests/queue_system_test.go b/system_tests/queue_system_test.go index b86cc12a..12bb4092 100644 --- a/system_tests/queue_system_test.go +++ b/system_tests/queue_system_test.go @@ -45,6 +45,10 @@ var _ = Describe("Queue Controller", func() { } }) + AfterEach(func() { + _ = k8sClient.Delete(ctx, q) + }) + It("declares and deletes a queue successfully", func() { By("declaring queue") Expect(k8sClient.Create(ctx, q, &client.CreateOptions{})).To(Succeed()) diff --git a/system_tests/user_system_test.go b/system_tests/user_system_test.go index 74b9a0be..73ef1134 100644 --- a/system_tests/user_system_test.go +++ b/system_tests/user_system_test.go @@ -22,6 +22,10 @@ var _ = Describe("Users", func() { user *topology.User ) + AfterEach(func() { + _ = k8sClient.Delete(ctx, user) + }) + When("relying on the operator to generate a username and password", func() { BeforeEach(func() { user = &topology.User{ @@ -388,4 +392,81 @@ var _ = Describe("Users", func() { Expect(user.PasswordHash).To(Equal("")) }) }) + + When("providing user limits", func() { + const username = "limits-user" + var credentialSecret corev1.Secret + var connections, channels int32 + BeforeEach(func() { + credentialSecret = corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "user-limit-secret", + Namespace: namespace, + }, + Type: corev1.SecretTypeOpaque, + Data: map[string][]byte{ + "username": []byte(username), + }, + } + Expect(k8sClient.Create(ctx, &credentialSecret, &client.CreateOptions{})).To(Succeed()) + + connections = 4 + channels = 6 + user = &topology.User{ + ObjectMeta: metav1.ObjectMeta{ + Name: username, + Namespace: namespace, + }, + Spec: topology.UserSpec{ + RabbitmqClusterReference: topology.RabbitmqClusterReference{ + Name: rmq.Name, + }, + ImportCredentialsSecret: &corev1.LocalObjectReference{ + Name: credentialSecret.Name, + }, + UserLimits: &topology.UserLimits{ + Connections: &connections, + Channels: &channels, + }, + }, + } + }) + + AfterEach(func() { + Expect(k8sClient.Delete(ctx, &credentialSecret)).To(Succeed()) + }) + + It("Creates and deletes a user with the specified limits", func() { + var err error + By("declaring user") + Expect(k8sClient.Create(ctx, user, &client.CreateOptions{})).To(Succeed()) + + By("setting the correct user limits") + var userLimitsInfo []rabbithole.UserLimitsInfo + Eventually(func() error { + userLimitsInfo, err = rabbitClient.GetUserLimits(username) + return err + }, 30, 2).Should(BeNil()) + Expect(userLimitsInfo).To(HaveLen(1)) + Expect(userLimitsInfo[0].User).To(Equal(username)) + Expect(userLimitsInfo[0].Value).To(HaveKeyWithValue("max-connections", int(connections))) + Expect(userLimitsInfo[0].Value).To(HaveKeyWithValue("max-channels", int(channels))) + + By("deleting user") + Expect(k8sClient.Delete(ctx, user)).To(Succeed()) + Eventually(func() error { + _, err = rabbitClient.GetUser(username) + return err + }, 30).Should(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("Object Not Found")) + + By("deleting the user limits") + Eventually(func() error { + userLimitsInfo, err = rabbitClient.GetUserLimits(username) + return err + }, 10, 2).Should(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("Not Found")) + Expect(userLimitsInfo).To(BeEmpty()) + }) + }) }) diff --git a/system_tests/vhost_system_test.go b/system_tests/vhost_system_test.go index af892706..d81fb8d6 100644 --- a/system_tests/vhost_system_test.go +++ b/system_tests/vhost_system_test.go @@ -39,6 +39,10 @@ var _ = Describe("vhost", func() { } }) + AfterEach(func() { + _ = k8sClient.Delete(ctx, vhost) + }) + It("creates and deletes a vhost successfully", func() { By("creating a vhost") Expect(k8sClient.Create(ctx, vhost, &client.CreateOptions{})).To(Succeed())