Skip to content

Commit 172b735

Browse files
authored
Merge pull request #961 from aawoznia26/922_controlling_resources_deletion
[#922] controlling resources deletion
2 parents e10a1ec + 455877c commit 172b735

35 files changed

+510
-6
lines changed

api/v1beta1/federation_types.go

+5
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ type FederationSpec struct {
3535
TrustUserId bool `json:"trustUserId,omitempty"`
3636
Exchange string `json:"exchange,omitempty"`
3737
Queue string `json:"queue,omitempty"`
38+
// DeletionPolicy defines the behavior of federation in the RabbitMQ cluster when the corresponding custom resource is deleted.
39+
// Can be set to 'delete' or 'retain'. Default is 'delete'.
40+
// +kubebuilder:validation:Enum=delete;retain
41+
// +kubebuilder:default:=delete
42+
DeletionPolicy string `json:"deletionPolicy,omitempty"`
3843
}
3944

4045
// FederationStatus defines the observed state of Federation

api/v1beta1/federation_types_test.go

+29
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ var _ = Describe("Federation spec", func() {
2525
UriSecret: &corev1.LocalObjectReference{
2626
Name: "a-secret",
2727
},
28+
DeletionPolicy: "delete",
2829
}
2930

3031
federation := Federation{
@@ -122,4 +123,32 @@ var _ = Describe("Federation spec", func() {
122123
Expect(k8sClient.Create(ctx, &federation)).To(MatchError(`Federation.rabbitmq.com "invalid-federation" is invalid: spec.ackMode: Unsupported value: "non-existing-ackmode": supported values: "on-confirm", "on-publish", "no-ack"`))
123124
})
124125
})
126+
127+
It("creates a federation with non-default DeletionPolicy", func() {
128+
federation := Federation{
129+
ObjectMeta: metav1.ObjectMeta{
130+
Name: "federation-with-retain-policy",
131+
Namespace: namespace,
132+
},
133+
Spec: FederationSpec{
134+
Name: "federation-with-retain-policy",
135+
DeletionPolicy: "retain",
136+
UriSecret: &corev1.LocalObjectReference{
137+
Name: "a-secret",
138+
},
139+
RabbitmqClusterReference: RabbitmqClusterReference{
140+
Name: "some-cluster",
141+
},
142+
},
143+
}
144+
Expect(k8sClient.Create(ctx, &federation)).To(Succeed())
145+
fetched := &Federation{}
146+
Expect(k8sClient.Get(ctx, types.NamespacedName{
147+
Name: federation.Name,
148+
Namespace: federation.Namespace,
149+
}, fetched)).To(Succeed())
150+
151+
Expect(fetched.Spec.DeletionPolicy).To(Equal("retain"))
152+
Expect(fetched.Spec.Name).To(Equal("federation-with-retain-policy"))
153+
})
125154
})

api/v1beta1/federation_webhook_test.go

+7
Original file line numberDiff line numberDiff line change
@@ -115,5 +115,12 @@ var _ = Describe("federation webhook", func() {
115115
_, err := newFederation.ValidateUpdate(rootCtx, &federation, newFederation)
116116
Expect(err).To(Succeed())
117117
})
118+
119+
It("allows updates on federation.spec.deletionPolicy", func() {
120+
newFederation := federation.DeepCopy()
121+
newFederation.Spec.DeletionPolicy = "retain"
122+
_, err := newFederation.ValidateUpdate(rootCtx, &federation, newFederation)
123+
Expect(err).ToNot(HaveOccurred())
124+
})
118125
})
119126
})

api/v1beta1/queue_types.go

+5
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ type QueueSpec struct {
4545
// Required property.
4646
// +kubebuilder:validation:Required
4747
RabbitmqClusterReference RabbitmqClusterReference `json:"rabbitmqClusterReference"`
48+
// DeletionPolicy defines the behavior of queue in the RabbitMQ cluster when the corresponding custom resource is deleted.
49+
// Can be set to 'delete' or 'retain'. Default is 'delete'.
50+
// +kubebuilder:validation:Enum=delete;retain
51+
// +kubebuilder:default:=delete
52+
DeletionPolicy string `json:"deletionPolicy,omitempty"`
4853
}
4954

5055
// QueueStatus defines the observed state of Queue

api/v1beta1/queue_types_test.go

+30-4
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@ var _ = Describe("Queue spec", func() {
1717

1818
It("creates a queue with default settings", func() {
1919
expectedSpec := QueueSpec{
20-
Name: "test-queue",
21-
Vhost: "/",
22-
Durable: false,
23-
AutoDelete: false,
20+
Name: "test-queue",
21+
Vhost: "/",
22+
Durable: false,
23+
AutoDelete: false,
24+
DeletionPolicy: "delete",
2425
RabbitmqClusterReference: RabbitmqClusterReference{
2526
Name: "some-cluster",
2627
},
@@ -85,4 +86,29 @@ var _ = Describe("Queue spec", func() {
8586
}))
8687
Expect(fetchedQ.Spec.Arguments.Raw).To(Equal([]byte(`{"yoyo":10}`)))
8788
})
89+
90+
It("creates a queue with non-default DeletionPolicy", func() {
91+
q := Queue{
92+
ObjectMeta: metav1.ObjectMeta{
93+
Name: "queue-with-retain-policy",
94+
Namespace: namespace,
95+
},
96+
Spec: QueueSpec{
97+
Name: "queue-with-retain-policy",
98+
DeletionPolicy: "retain",
99+
RabbitmqClusterReference: RabbitmqClusterReference{
100+
Name: "random-cluster",
101+
},
102+
},
103+
}
104+
Expect(k8sClient.Create(ctx, &q)).To(Succeed())
105+
fetchedQ := &Queue{}
106+
Expect(k8sClient.Get(ctx, types.NamespacedName{
107+
Name: q.Name,
108+
Namespace: q.Namespace,
109+
}, fetchedQ)).To(Succeed())
110+
111+
Expect(fetchedQ.Spec.DeletionPolicy).To(Equal("retain"))
112+
Expect(fetchedQ.Spec.Name).To(Equal("queue-with-retain-policy"))
113+
})
88114
})

api/v1beta1/queue_webhook_test.go

+7
Original file line numberDiff line numberDiff line change
@@ -154,5 +154,12 @@ var _ = Describe("queue webhook", func() {
154154
_, err = newQueue.ValidateUpdate(rootCtx, &queue, newQueue)
155155
Expect(err).To(MatchError(ContainSubstring("queue arguments cannot be updated")))
156156
})
157+
158+
It("allows updates on queue.spec.deletionPolicy", func() {
159+
newQueue := queue.DeepCopy()
160+
newQueue.Spec.DeletionPolicy = "retain"
161+
_, err := newQueue.ValidateUpdate(rootCtx, &queue, newQueue)
162+
Expect(err).ToNot(HaveOccurred())
163+
})
157164
})
158165
})

api/v1beta1/shovel_types.go

+5
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ type ShovelSpec struct {
8484
SourceConsumerArgs *runtime.RawExtension `json:"srcConsumerArgs,omitempty"`
8585
// amqp10 configuration; required if srcProtocol is amqp10
8686
SourceAddress string `json:"srcAddress,omitempty"`
87+
// DeletionPolicy defines the behavior of shovel in the RabbitMQ cluster when the corresponding custom resource is deleted.
88+
// Can be set to 'delete' or 'retain'. Default is 'delete'.
89+
// +kubebuilder:validation:Enum=delete;retain
90+
// +kubebuilder:default:=delete
91+
DeletionPolicy string `json:"deletionPolicy,omitempty"`
8792
}
8893

8994
// ShovelStatus defines the observed state of Shovel

api/v1beta1/shovel_types_test.go

+32
Original file line numberDiff line numberDiff line change
@@ -193,4 +193,36 @@ var _ = Describe("Shovel spec", func() {
193193
Expect(k8sClient.Create(ctx, &shovel)).To(MatchError(`Shovel.rabbitmq.com "an-invalid-srcprotocol" is invalid: spec.srcProtocol: Unsupported value: "mqtt": supported values: "amqp091", "amqp10"`))
194194
})
195195
})
196+
197+
It("creates a shovel with non-default DeletionPolicy", func() {
198+
shovel := Shovel{
199+
ObjectMeta: metav1.ObjectMeta{
200+
Name: "shovel-with-retain-policy",
201+
Namespace: namespace,
202+
},
203+
Spec: ShovelSpec{
204+
Name: "shovel-with-retain-policy",
205+
DeletionPolicy: "retain",
206+
RabbitmqClusterReference: RabbitmqClusterReference{
207+
Name: "some-cluster",
208+
},
209+
UriSecret: &corev1.LocalObjectReference{
210+
Name: "a-secret",
211+
},
212+
},
213+
}
214+
Expect(k8sClient.Create(ctx, &shovel)).To(Succeed())
215+
fetched := &Shovel{}
216+
Expect(k8sClient.Get(ctx, types.NamespacedName{
217+
Name: shovel.Name,
218+
Namespace: shovel.Namespace,
219+
}, fetched)).To(Succeed())
220+
221+
Expect(fetched.Spec.DeletionPolicy).To(Equal("retain"))
222+
Expect(fetched.Spec.Name).To(Equal("shovel-with-retain-policy"))
223+
Expect(fetched.Spec.RabbitmqClusterReference).To(Equal(RabbitmqClusterReference{
224+
Name: "some-cluster",
225+
}))
226+
Expect(fetched.Spec.UriSecret.Name).To(Equal("a-secret"))
227+
})
196228
})

api/v1beta1/shovel_webhook_test.go

+7
Original file line numberDiff line numberDiff line change
@@ -179,5 +179,12 @@ var _ = Describe("shovel webhook", func() {
179179
_, err := newShovel.ValidateUpdate(rootCtx, &shovel, newShovel)
180180
Expect(err).ToNot(HaveOccurred())
181181
})
182+
183+
It("allows updates on shovel.spec.deletionPolicy", func() {
184+
newShovel := shovel.DeepCopy()
185+
newShovel.Spec.DeletionPolicy = "retain"
186+
_, err := newShovel.ValidateUpdate(rootCtx, &shovel, newShovel)
187+
Expect(err).ToNot(HaveOccurred())
188+
})
182189
})
183190
})

api/v1beta1/vhost_types.go

+5
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ type VhostSpec struct {
2929
// Required property.
3030
// +kubebuilder:validation:Required
3131
RabbitmqClusterReference RabbitmqClusterReference `json:"rabbitmqClusterReference"`
32+
// DeletionPolicy defines the behavior of vhost in the RabbitMQ cluster when the corresponding custom resource is deleted.
33+
// Can be set to 'delete' or 'retain'. Default is 'delete'.
34+
// +kubebuilder:validation:Enum=delete;retain
35+
// +kubebuilder:default:=delete
36+
DeletionPolicy string `json:"deletionPolicy,omitempty"`
3237
}
3338

3439
// VhostStatus defines the observed state of Vhost

api/v1beta1/vhost_types_test.go

+28-2
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@ var _ = Describe("Vhost", func() {
1616

1717
It("creates a vhost", func() {
1818
expectedSpec := VhostSpec{
19-
Name: "test-vhost",
20-
Tracing: false,
19+
Name: "test-vhost",
20+
Tracing: false,
21+
DeletionPolicy: "delete",
2122
RabbitmqClusterReference: RabbitmqClusterReference{
2223
Name: "some-cluster",
2324
},
@@ -134,4 +135,29 @@ var _ = Describe("Vhost", func() {
134135
Expect(k8sClient.Create(ctx, qTypeVhost)).To(MatchError(`Vhost.rabbitmq.com "some-vhost" is invalid: spec.defaultQueueType: Unsupported value: "aqueuetype": supported values: "quorum", "classic", "stream"`))
135136
})
136137
})
138+
139+
It("creates a vhost with non-default DeletionPolicy", func() {
140+
vhost := Vhost{
141+
ObjectMeta: metav1.ObjectMeta{
142+
Name: "vhost-with-retain-policy",
143+
Namespace: namespace,
144+
},
145+
Spec: VhostSpec{
146+
Name: "vhost-with-retain-policy",
147+
DeletionPolicy: "retain",
148+
RabbitmqClusterReference: RabbitmqClusterReference{
149+
Name: "random-cluster",
150+
},
151+
},
152+
}
153+
Expect(k8sClient.Create(ctx, &vhost)).To(Succeed())
154+
fetched := &Vhost{}
155+
Expect(k8sClient.Get(ctx, types.NamespacedName{
156+
Name: vhost.Name,
157+
Namespace: vhost.Namespace,
158+
}, fetched)).To(Succeed())
159+
160+
Expect(fetched.Spec.DeletionPolicy).To(Equal("retain"))
161+
Expect(fetched.Spec.Name).To(Equal("vhost-with-retain-policy"))
162+
})
137163
})

api/v1beta1/vhost_webhook_test.go

+7
Original file line numberDiff line numberDiff line change
@@ -102,5 +102,12 @@ var _ = Describe("vhost webhook", func() {
102102
_, err := newVhost.ValidateUpdate(rootCtx, &vhost, newVhost)
103103
Expect(err).ToNot(HaveOccurred())
104104
})
105+
106+
It("allows updates on vhost.spec.deletionPolicy", func() {
107+
newVhost := vhost.DeepCopy()
108+
newVhost.Spec.DeletionPolicy = "retain"
109+
_, err := newVhost.ValidateUpdate(rootCtx, &vhost, newVhost)
110+
Expect(err).ToNot(HaveOccurred())
111+
})
105112
})
106113
})

config/crd/bases/rabbitmq.com_federations.yaml

+9
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,15 @@ spec:
4949
- on-publish
5050
- no-ack
5151
type: string
52+
deletionPolicy:
53+
default: delete
54+
description: |-
55+
DeletionPolicy defines the behavior of federation in the RabbitMQ cluster when the corresponding custom resource is deleted.
56+
Can be set to 'delete' or 'retain'. Default is 'delete'.
57+
enum:
58+
- delete
59+
- retain
60+
type: string
5261
exchange:
5362
type: string
5463
expires:

config/crd/bases/rabbitmq.com_queues.yaml

+9
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,15 @@ spec:
5858
description: when set to true, queues are delete only if they have
5959
no consumer.
6060
type: boolean
61+
deletionPolicy:
62+
default: delete
63+
description: |-
64+
DeletionPolicy defines the behavior of queue in the RabbitMQ cluster when the corresponding custom resource is deleted.
65+
Can be set to 'delete' or 'retain'. Default is 'delete'.
66+
enum:
67+
- delete
68+
- retain
69+
type: string
6170
durable:
6271
description: When set to false queues does not survive server restart.
6372
type: boolean

config/crd/bases/rabbitmq.com_shovels.yaml

+9
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,15 @@ spec:
5353
type: boolean
5454
deleteAfter:
5555
type: string
56+
deletionPolicy:
57+
default: delete
58+
description: |-
59+
DeletionPolicy defines the behavior of shovel in the RabbitMQ cluster when the corresponding custom resource is deleted.
60+
Can be set to 'delete' or 'retain'. Default is 'delete'.
61+
enum:
62+
- delete
63+
- retain
64+
type: string
5665
destAddForwardHeaders:
5766
type: boolean
5867
destAddTimestampHeader:

config/crd/bases/rabbitmq.com_vhosts.yaml

+9
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,15 @@ spec:
5050
- classic
5151
- stream
5252
type: string
53+
deletionPolicy:
54+
default: delete
55+
description: |-
56+
DeletionPolicy defines the behavior of vhost in the RabbitMQ cluster when the corresponding custom resource is deleted.
57+
Can be set to 'delete' or 'retain'. Default is 'delete'.
58+
enum:
59+
- delete
60+
- retain
61+
type: string
5362
name:
5463
description: Name of the vhost; see https://www.rabbitmq.com/vhosts.html.
5564
type: string

controllers/federation_controller.go

+3
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ func (r *FederationReconciler) getUri(ctx context.Context, federation *topology.
5454
func (r *FederationReconciler) DeleteFunc(ctx context.Context, client rabbitmqclient.Client, obj topology.TopologyResource) error {
5555
logger := ctrl.LoggerFrom(ctx)
5656
federation := obj.(*topology.Federation)
57+
if shouldSkipDeletion(ctx, federation.Spec.DeletionPolicy, federation.Spec.Name) {
58+
return nil
59+
}
5760
err := validateResponseForDeletion(client.DeleteFederationUpstream(federation.Spec.Vhost, federation.Spec.Name))
5861
if errors.Is(err, NotFound) {
5962
logger.Info("cannot find federation upstream parameter; no need to delete it", "federation", federation.Spec.Name)

controllers/federation_controller_test.go

+26
Original file line numberDiff line numberDiff line change
@@ -225,4 +225,30 @@ var _ = Describe("federation-controller", func() {
225225
})
226226
})
227227
})
228+
229+
When("the Federation has DeletionPolicy set to retain", func() {
230+
BeforeEach(func() {
231+
federationName = "federation-with-retain-policy"
232+
federation.Spec.DeletionPolicy = "retain"
233+
fakeRabbitMQClient.DeleteFederationUpstreamReturns(&http.Response{
234+
Status: "200 OK",
235+
StatusCode: http.StatusOK,
236+
}, nil)
237+
})
238+
239+
It("deletes the k8s resource but preserves the federation in RabbitMQ server", func() {
240+
Expect(k8sClient.Create(ctx, &federation)).To(Succeed())
241+
Expect(k8sClient.Delete(ctx, &federation)).To(Succeed())
242+
243+
Eventually(func() bool {
244+
err := k8sClient.Get(ctx, types.NamespacedName{Name: federation.Name, Namespace: federation.Namespace}, &federation)
245+
return apierrors.IsNotFound(err)
246+
}).
247+
Within(statusEventsUpdateTimeout).
248+
WithPolling(time.Second).
249+
Should(BeTrue())
250+
251+
Expect(fakeRabbitMQClient.DeleteFederationUpstreamCallCount()).To(Equal(0))
252+
})
253+
})
228254
})

controllers/queue_controller.go

+3
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ func (r *QueueReconciler) DeleteFunc(ctx context.Context, client rabbitmqclient.
4747
logger.Info("Deleting queues from ReconcilerFunc DeleteObj")
4848

4949
queue := obj.(*topology.Queue)
50+
if shouldSkipDeletion(ctx, queue.Spec.DeletionPolicy, queue.Spec.Name) {
51+
return nil
52+
}
5053
queueDeleteOptions, err := internal.GenerateQueueDeleteOptions(queue)
5154
if err != nil {
5255
return fmt.Errorf("failed to generate queue delete options: %w", err)

0 commit comments

Comments
 (0)