Skip to content

Commit 65f96da

Browse files
Extend FederationSpec with QueueType and ResourceCleanupMode (#974)
* Extend FederationSpec with QueueType and ResourceCleanupMode
1 parent baa57a2 commit 65f96da

39 files changed

+117
-212
lines changed

api/v1beta1/federation_types.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,19 @@ type FederationSpec struct {
4040
// +kubebuilder:validation:Enum=delete;retain
4141
// +kubebuilder:default:=delete
4242
DeletionPolicy string `json:"deletionPolicy,omitempty"`
43+
// The queue type of the internal upstream queue used by exchange federation.
44+
// Defaults to classic (a single replica queue type). Set to quorum to use a replicated queue type.
45+
// Changing the queue type will delete and recreate the upstream queue by default.
46+
// This may lead to messages getting lost or not routed anywhere during the re-declaration.
47+
// To avoid that, set resource-cleanup-mode key to never.
48+
// This requires manually deleting the old upstream queue so that it can be recreated with the new type.
49+
// +kubebuilder:validation:Enum=classic;quorum
50+
QueueType string `json:"queueType,omitempty"`
51+
// Whether to delete the internal upstream queue when federation links stop.
52+
// By default, the internal upstream queue is deleted immediately when a federation link stops.
53+
// Set to never to keep the upstream queue around and collect messages even when changing federation configuration.
54+
// +kubebuilder:validation:Enum=default;never
55+
ResourceCleanupMode string `json:"resourceCleanupMode,omitempty"`
4356
}
4457

4558
// FederationStatus defines the observed state of Federation

api/v1beta1/federation_types_test.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,16 @@ var _ = Describe("Federation spec", func() {
6464
UriSecret: &corev1.LocalObjectReference{
6565
Name: "a-secret",
6666
},
67-
Expires: 1000,
68-
MessageTTL: 1000,
69-
MaxHops: 100,
70-
PrefetchCount: 50,
71-
ReconnectDelay: 10,
72-
TrustUserId: true,
73-
Exchange: "an-exchange",
74-
AckMode: "no-ack",
67+
Expires: 1000,
68+
MessageTTL: 1000,
69+
MaxHops: 100,
70+
PrefetchCount: 50,
71+
ReconnectDelay: 10,
72+
TrustUserId: true,
73+
Exchange: "an-exchange",
74+
AckMode: "no-ack",
75+
QueueType: "quorum",
76+
ResourceCleanupMode: "never",
7577
RabbitmqClusterReference: RabbitmqClusterReference{
7678
Name: "some-cluster",
7779
},
@@ -99,6 +101,8 @@ var _ = Describe("Federation spec", func() {
99101
Expect(fetched.Spec.MaxHops).To(Equal(100))
100102
Expect(fetched.Spec.PrefetchCount).To(Equal(50))
101103
Expect(fetched.Spec.ReconnectDelay).To(Equal(10))
104+
Expect(fetched.Spec.QueueType).To(Equal("quorum"))
105+
Expect(fetched.Spec.ResourceCleanupMode).To(Equal("never"))
102106
})
103107

104108
When("creating a federation with an invalid 'AckMode' value", func() {

config/crd/bases/rabbitmq.com_federations.yaml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,18 @@ spec:
7373
type: integer
7474
queue:
7575
type: string
76+
queueType:
77+
description: |-
78+
The queue type of the internal upstream queue used by exchange federation.
79+
Defaults to classic (a single replica queue type). Set to quorum to use a replicated queue type.
80+
Changing the queue type will delete and recreate the upstream queue by default.
81+
This may lead to messages getting lost or not routed anywhere during the re-declaration.
82+
To avoid that, set resource-cleanup-mode key to never.
83+
This requires manually deleting the old upstream queue so that it can be recreated with the new type.
84+
enum:
85+
- classic
86+
- quorum
87+
type: string
7688
rabbitmqClusterReference:
7789
description: |-
7890
Reference to the RabbitmqCluster that this federation upstream will be created in.
@@ -108,6 +120,15 @@ spec:
108120
type: object
109121
reconnectDelay:
110122
type: integer
123+
resourceCleanupMode:
124+
description: |-
125+
Whether to delete the internal upstream queue when federation links stop.
126+
By default, the internal upstream queue is deleted immediately when a federation link stops.
127+
Set to never to keep the upstream queue around and collect messages even when changing federation configuration.
128+
enum:
129+
- default
130+
- never
131+
type: string
111132
trustUserId:
112133
type: boolean
113134
uriSecret:

controllers/binding_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
"reflect"
1818

1919
"github.com/go-logr/logr"
20-
rabbithole "github.com/michaelklishin/rabbit-hole/v2"
20+
rabbithole "github.com/michaelklishin/rabbit-hole/v3"
2121
topology "github.com/rabbitmq/messaging-topology-operator/api/v1beta1"
2222
"github.com/rabbitmq/messaging-topology-operator/internal"
2323
"github.com/rabbitmq/messaging-topology-operator/rabbitmqclient"

docs/api/rabbitmq.com.ref.asciidoc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -496,6 +496,15 @@ Required property.
496496
| *`queue`* __string__ |
497497
| *`deletionPolicy`* __string__ | DeletionPolicy defines the behavior of federation in the RabbitMQ cluster when the corresponding custom resource is deleted.
498498
Can be set to 'delete' or 'retain'. Default is 'delete'.
499+
| *`queueType`* __string__ | The queue type of the internal upstream queue used by exchange federation.
500+
Defaults to classic (a single replica queue type). Set to quorum to use a replicated queue type.
501+
Changing the queue type will delete and recreate the upstream queue by default.
502+
This may lead to messages getting lost or not routed anywhere during the re-declaration.
503+
To avoid that, set resource-cleanup-mode key to never.
504+
This requires manually deleting the old upstream queue so that it can be recreated with the new type.
505+
| *`resourceCleanupMode`* __string__ | Whether to delete the internal upstream queue when federation links stop.
506+
By default, the internal upstream queue is deleted immediately when a federation link stops.
507+
Set to never to keep the upstream queue around and collect messages even when changing federation configuration.
499508
|===
500509

501510

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ require (
99
github.com/go-logr/logr v1.4.2
1010
github.com/google/uuid v1.6.0
1111
github.com/hashicorp/vault/api v1.16.0
12-
github.com/michaelklishin/rabbit-hole/v2 v2.16.0
12+
github.com/michaelklishin/rabbit-hole/v3 v3.2.0
1313
github.com/onsi/ginkgo/v2 v2.23.3
14-
github.com/onsi/gomega v1.36.3
14+
github.com/onsi/gomega v1.37.0
1515
github.com/rabbitmq/cluster-operator/v2 v2.12.1
1616
gopkg.in/ini.v1 v1.67.0
1717
k8s.io/api v0.32.3

0 commit comments

Comments
 (0)