Skip to content

[#922] controlling resources deletion #961

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/v1beta1/federation_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ type FederationSpec struct {
TrustUserId bool `json:"trustUserId,omitempty"`
Exchange string `json:"exchange,omitempty"`
Queue string `json:"queue,omitempty"`
// DeletionPolicy defines the behavior of federation in the RabbitMQ cluster when the corresponding custom resource is deleted.
// Can be set to 'delete' or 'retain'. Default is 'delete'.
// +kubebuilder:validation:Enum=delete;retain
// +kubebuilder:default:=delete
DeletionPolicy string `json:"deletionPolicy,omitempty"`
}

// FederationStatus defines the observed state of Federation
Expand Down
29 changes: 29 additions & 0 deletions api/v1beta1/federation_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var _ = Describe("Federation spec", func() {
UriSecret: &corev1.LocalObjectReference{
Name: "a-secret",
},
DeletionPolicy: "delete",
}

federation := Federation{
Expand Down Expand Up @@ -122,4 +123,32 @@ var _ = Describe("Federation spec", func() {
Expect(k8sClient.Create(ctx, &federation)).To(MatchError(`Federation.rabbitmq.com "invalid-federation" is invalid: spec.ackMode: Unsupported value: "non-existing-ackmode": supported values: "on-confirm", "on-publish", "no-ack"`))
})
})

It("creates a federation with non-default DeletionPolicy", func() {
federation := Federation{
ObjectMeta: metav1.ObjectMeta{
Name: "federation-with-retain-policy",
Namespace: namespace,
},
Spec: FederationSpec{
Name: "federation-with-retain-policy",
DeletionPolicy: "retain",
UriSecret: &corev1.LocalObjectReference{
Name: "a-secret",
},
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "some-cluster",
},
},
}
Expect(k8sClient.Create(ctx, &federation)).To(Succeed())
fetched := &Federation{}
Expect(k8sClient.Get(ctx, types.NamespacedName{
Name: federation.Name,
Namespace: federation.Namespace,
}, fetched)).To(Succeed())

Expect(fetched.Spec.DeletionPolicy).To(Equal("retain"))
Expect(fetched.Spec.Name).To(Equal("federation-with-retain-policy"))
})
})
7 changes: 7 additions & 0 deletions api/v1beta1/federation_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,5 +115,12 @@ var _ = Describe("federation webhook", func() {
_, err := newFederation.ValidateUpdate(rootCtx, &federation, newFederation)
Expect(err).To(Succeed())
})

It("allows updates on federation.spec.deletionPolicy", func() {
newFederation := federation.DeepCopy()
newFederation.Spec.DeletionPolicy = "retain"
_, err := newFederation.ValidateUpdate(rootCtx, &federation, newFederation)
Expect(err).ToNot(HaveOccurred())
})
})
})
5 changes: 5 additions & 0 deletions api/v1beta1/queue_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ type QueueSpec struct {
// Required property.
// +kubebuilder:validation:Required
RabbitmqClusterReference RabbitmqClusterReference `json:"rabbitmqClusterReference"`
// DeletionPolicy defines the behavior of queue in the RabbitMQ cluster when the corresponding custom resource is deleted.
// Can be set to 'delete' or 'retain'. Default is 'delete'.
// +kubebuilder:validation:Enum=delete;retain
// +kubebuilder:default:=delete
DeletionPolicy string `json:"deletionPolicy,omitempty"`
}

// QueueStatus defines the observed state of Queue
Expand Down
34 changes: 30 additions & 4 deletions api/v1beta1/queue_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ var _ = Describe("Queue spec", func() {

It("creates a queue with default settings", func() {
expectedSpec := QueueSpec{
Name: "test-queue",
Vhost: "/",
Durable: false,
AutoDelete: false,
Name: "test-queue",
Vhost: "/",
Durable: false,
AutoDelete: false,
DeletionPolicy: "delete",
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "some-cluster",
},
Expand Down Expand Up @@ -85,4 +86,29 @@ var _ = Describe("Queue spec", func() {
}))
Expect(fetchedQ.Spec.Arguments.Raw).To(Equal([]byte(`{"yoyo":10}`)))
})

It("creates a queue with non-default DeletionPolicy", func() {
q := Queue{
ObjectMeta: metav1.ObjectMeta{
Name: "queue-with-retain-policy",
Namespace: namespace,
},
Spec: QueueSpec{
Name: "queue-with-retain-policy",
DeletionPolicy: "retain",
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "random-cluster",
},
},
}
Expect(k8sClient.Create(ctx, &q)).To(Succeed())
fetchedQ := &Queue{}
Expect(k8sClient.Get(ctx, types.NamespacedName{
Name: q.Name,
Namespace: q.Namespace,
}, fetchedQ)).To(Succeed())

Expect(fetchedQ.Spec.DeletionPolicy).To(Equal("retain"))
Expect(fetchedQ.Spec.Name).To(Equal("queue-with-retain-policy"))
})
})
7 changes: 7 additions & 0 deletions api/v1beta1/queue_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,5 +154,12 @@ var _ = Describe("queue webhook", func() {
_, err = newQueue.ValidateUpdate(rootCtx, &queue, newQueue)
Expect(err).To(MatchError(ContainSubstring("queue arguments cannot be updated")))
})

It("allows updates on queue.spec.deletionPolicy", func() {
newQueue := queue.DeepCopy()
newQueue.Spec.DeletionPolicy = "retain"
_, err := newQueue.ValidateUpdate(rootCtx, &queue, newQueue)
Expect(err).ToNot(HaveOccurred())
})
})
})
5 changes: 5 additions & 0 deletions api/v1beta1/shovel_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ type ShovelSpec struct {
SourceConsumerArgs *runtime.RawExtension `json:"srcConsumerArgs,omitempty"`
// amqp10 configuration; required if srcProtocol is amqp10
SourceAddress string `json:"srcAddress,omitempty"`
// DeletionPolicy defines the behavior of shovel in the RabbitMQ cluster when the corresponding custom resource is deleted.
// Can be set to 'delete' or 'retain'. Default is 'delete'.
// +kubebuilder:validation:Enum=delete;retain
// +kubebuilder:default:=delete
DeletionPolicy string `json:"deletionPolicy,omitempty"`
}

// ShovelStatus defines the observed state of Shovel
Expand Down
32 changes: 32 additions & 0 deletions api/v1beta1/shovel_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,36 @@ var _ = Describe("Shovel spec", func() {
Expect(k8sClient.Create(ctx, &shovel)).To(MatchError(`Shovel.rabbitmq.com "an-invalid-srcprotocol" is invalid: spec.srcProtocol: Unsupported value: "mqtt": supported values: "amqp091", "amqp10"`))
})
})

It("creates a shovel with non-default DeletionPolicy", func() {
shovel := Shovel{
ObjectMeta: metav1.ObjectMeta{
Name: "shovel-with-retain-policy",
Namespace: namespace,
},
Spec: ShovelSpec{
Name: "shovel-with-retain-policy",
DeletionPolicy: "retain",
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "some-cluster",
},
UriSecret: &corev1.LocalObjectReference{
Name: "a-secret",
},
},
}
Expect(k8sClient.Create(ctx, &shovel)).To(Succeed())
fetched := &Shovel{}
Expect(k8sClient.Get(ctx, types.NamespacedName{
Name: shovel.Name,
Namespace: shovel.Namespace,
}, fetched)).To(Succeed())

Expect(fetched.Spec.DeletionPolicy).To(Equal("retain"))
Expect(fetched.Spec.Name).To(Equal("shovel-with-retain-policy"))
Expect(fetched.Spec.RabbitmqClusterReference).To(Equal(RabbitmqClusterReference{
Name: "some-cluster",
}))
Expect(fetched.Spec.UriSecret.Name).To(Equal("a-secret"))
})
})
7 changes: 7 additions & 0 deletions api/v1beta1/shovel_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,5 +179,12 @@ var _ = Describe("shovel webhook", func() {
_, err := newShovel.ValidateUpdate(rootCtx, &shovel, newShovel)
Expect(err).ToNot(HaveOccurred())
})

It("allows updates on shovel.spec.deletionPolicy", func() {
newShovel := shovel.DeepCopy()
newShovel.Spec.DeletionPolicy = "retain"
_, err := newShovel.ValidateUpdate(rootCtx, &shovel, newShovel)
Expect(err).ToNot(HaveOccurred())
})
})
})
5 changes: 5 additions & 0 deletions api/v1beta1/vhost_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ type VhostSpec struct {
// Required property.
// +kubebuilder:validation:Required
RabbitmqClusterReference RabbitmqClusterReference `json:"rabbitmqClusterReference"`
// DeletionPolicy defines the behavior of vhost in the RabbitMQ cluster when the corresponding custom resource is deleted.
// Can be set to 'delete' or 'retain'. Default is 'delete'.
// +kubebuilder:validation:Enum=delete;retain
// +kubebuilder:default:=delete
DeletionPolicy string `json:"deletionPolicy,omitempty"`
}

// VhostStatus defines the observed state of Vhost
Expand Down
30 changes: 28 additions & 2 deletions api/v1beta1/vhost_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ var _ = Describe("Vhost", func() {

It("creates a vhost", func() {
expectedSpec := VhostSpec{
Name: "test-vhost",
Tracing: false,
Name: "test-vhost",
Tracing: false,
DeletionPolicy: "delete",
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "some-cluster",
},
Expand Down Expand Up @@ -134,4 +135,29 @@ var _ = Describe("Vhost", func() {
Expect(k8sClient.Create(ctx, qTypeVhost)).To(MatchError(`Vhost.rabbitmq.com "some-vhost" is invalid: spec.defaultQueueType: Unsupported value: "aqueuetype": supported values: "quorum", "classic", "stream"`))
})
})

It("creates a vhost with non-default DeletionPolicy", func() {
vhost := Vhost{
ObjectMeta: metav1.ObjectMeta{
Name: "vhost-with-retain-policy",
Namespace: namespace,
},
Spec: VhostSpec{
Name: "vhost-with-retain-policy",
DeletionPolicy: "retain",
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "random-cluster",
},
},
}
Expect(k8sClient.Create(ctx, &vhost)).To(Succeed())
fetched := &Vhost{}
Expect(k8sClient.Get(ctx, types.NamespacedName{
Name: vhost.Name,
Namespace: vhost.Namespace,
}, fetched)).To(Succeed())

Expect(fetched.Spec.DeletionPolicy).To(Equal("retain"))
Expect(fetched.Spec.Name).To(Equal("vhost-with-retain-policy"))
})
})
7 changes: 7 additions & 0 deletions api/v1beta1/vhost_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,12 @@ var _ = Describe("vhost webhook", func() {
_, err := newVhost.ValidateUpdate(rootCtx, &vhost, newVhost)
Expect(err).ToNot(HaveOccurred())
})

It("allows updates on vhost.spec.deletionPolicy", func() {
newVhost := vhost.DeepCopy()
newVhost.Spec.DeletionPolicy = "retain"
_, err := newVhost.ValidateUpdate(rootCtx, &vhost, newVhost)
Expect(err).ToNot(HaveOccurred())
})
})
})
9 changes: 9 additions & 0 deletions config/crd/bases/rabbitmq.com_federations.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ spec:
- on-publish
- no-ack
type: string
deletionPolicy:
default: delete
description: |-
DeletionPolicy defines the behavior of federation in the RabbitMQ cluster when the corresponding custom resource is deleted.
Can be set to 'delete' or 'retain'. Default is 'delete'.
enum:
- delete
- retain
type: string
exchange:
type: string
expires:
Expand Down
9 changes: 9 additions & 0 deletions config/crd/bases/rabbitmq.com_queues.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ spec:
description: when set to true, queues are delete only if they have
no consumer.
type: boolean
deletionPolicy:
default: delete
description: |-
DeletionPolicy defines the behavior of queue in the RabbitMQ cluster when the corresponding custom resource is deleted.
Can be set to 'delete' or 'retain'. Default is 'delete'.
enum:
- delete
- retain
type: string
durable:
description: When set to false queues does not survive server restart.
type: boolean
Expand Down
9 changes: 9 additions & 0 deletions config/crd/bases/rabbitmq.com_shovels.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ spec:
type: boolean
deleteAfter:
type: string
deletionPolicy:
default: delete
description: |-
DeletionPolicy defines the behavior of shovel in the RabbitMQ cluster when the corresponding custom resource is deleted.
Can be set to 'delete' or 'retain'. Default is 'delete'.
enum:
- delete
- retain
type: string
destAddForwardHeaders:
type: boolean
destAddTimestampHeader:
Expand Down
9 changes: 9 additions & 0 deletions config/crd/bases/rabbitmq.com_vhosts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ spec:
- classic
- stream
type: string
deletionPolicy:
default: delete
description: |-
DeletionPolicy defines the behavior of vhost in the RabbitMQ cluster when the corresponding custom resource is deleted.
Can be set to 'delete' or 'retain'. Default is 'delete'.
enum:
- delete
- retain
type: string
name:
description: Name of the vhost; see https://www.rabbitmq.com/vhosts.html.
type: string
Expand Down
3 changes: 3 additions & 0 deletions controllers/federation_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ func (r *FederationReconciler) getUri(ctx context.Context, federation *topology.
func (r *FederationReconciler) DeleteFunc(ctx context.Context, client rabbitmqclient.Client, obj topology.TopologyResource) error {
logger := ctrl.LoggerFrom(ctx)
federation := obj.(*topology.Federation)
if shouldSkipDeletion(ctx, federation.Spec.DeletionPolicy, federation.Spec.Name) {
return nil
}
err := validateResponseForDeletion(client.DeleteFederationUpstream(federation.Spec.Vhost, federation.Spec.Name))
if errors.Is(err, NotFound) {
logger.Info("cannot find federation upstream parameter; no need to delete it", "federation", federation.Spec.Name)
Expand Down
26 changes: 26 additions & 0 deletions controllers/federation_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,30 @@ var _ = Describe("federation-controller", func() {
})
})
})

When("the Federation has DeletionPolicy set to retain", func() {
BeforeEach(func() {
federationName = "federation-with-retain-policy"
federation.Spec.DeletionPolicy = "retain"
fakeRabbitMQClient.DeleteFederationUpstreamReturns(&http.Response{
Status: "200 OK",
StatusCode: http.StatusOK,
}, nil)
})

It("deletes the k8s resource but preserves the federation in RabbitMQ server", func() {
Expect(k8sClient.Create(ctx, &federation)).To(Succeed())
Expect(k8sClient.Delete(ctx, &federation)).To(Succeed())

Eventually(func() bool {
err := k8sClient.Get(ctx, types.NamespacedName{Name: federation.Name, Namespace: federation.Namespace}, &federation)
return apierrors.IsNotFound(err)
}).
Within(statusEventsUpdateTimeout).
WithPolling(time.Second).
Should(BeTrue())

Expect(fakeRabbitMQClient.DeleteFederationUpstreamCallCount()).To(Equal(0))
})
})
})
3 changes: 3 additions & 0 deletions controllers/queue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ func (r *QueueReconciler) DeleteFunc(ctx context.Context, client rabbitmqclient.
logger.Info("Deleting queues from ReconcilerFunc DeleteObj")

queue := obj.(*topology.Queue)
if shouldSkipDeletion(ctx, queue.Spec.DeletionPolicy, queue.Spec.Name) {
return nil
}
queueDeleteOptions, err := internal.GenerateQueueDeleteOptions(queue)
if err != nil {
return fmt.Errorf("failed to generate queue delete options: %w", err)
Expand Down
Loading
Loading