Skip to content

Commit 3ba6164

Browse files
authored
Add SuperStreamCRD (#281)
## Summary Of Changes - Adds 1 new CRD: - SuperStream - This object represents a partitioned stream queue - one can specify the number of partitions per SuperStream - For `n` partitions, creates 1 exchange, `n` stream queues (a.k.a. 'partitions') and `n` bindings - `n` can be increased on an existing SuperStream; this will cause new partitions and bindings to be created - `n` cannot be decreased as this could lead to data loss - Routing keys for each of the bindings can be specified, or default to the index of the binding - Bindings are immutable in the topology operator, so these routing keys cannot be modified once set - If `n` is increased, additional routing keys must be specified if not left as the index of the binding ## Additional Context This CRD is compatible with any 3.9 RabbitMQ image where the stream plugin is enabled.
1 parent ae22d64 commit 3ba6164

File tree

69 files changed

+3202
-312
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

69 files changed

+3202
-312
lines changed

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ manifests: install-tools
9696
# Generate API reference documentation
9797
api-reference:
9898
crd-ref-docs \
99-
--source-path ./api/v1beta1 \
99+
--source-path ./api \
100100
--config ./docs/api/autogen/config.yaml \
101101
--templates-dir ./docs/api/autogen/templates \
102102
--output-path ./docs/api/rabbitmq.com.ref.asciidoc \

PROJECT

+11
Original file line numberDiff line numberDiff line change
@@ -94,4 +94,15 @@ resources:
9494
kind: Shovel
9595
path: github.com/rabbitmq/messaging-topology-operator/api/v1beta1
9696
version: v1beta1
97+
- api:
98+
crdVersion: v1
99+
namespaced: true
100+
domain: rabbitmq.com
101+
group: rabbitmq.com
102+
kind: SuperStream
103+
path: github.com/rabbitmq/messaging-topology-operator/api/v1alpha1
104+
version: v1alpha1
105+
webhooks:
106+
validation: true
107+
webhookVersion: v1
97108
version: "3"

api/v1alpha1/groupversion_info.go

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
RabbitMQ Messaging Topology Kubernetes Operator
3+
Copyright 2021 VMware, Inc.
4+
5+
This product is licensed to you under the Mozilla Public License 2.0 license (the "License"). You may not use this product except in compliance with the Mozilla 2.0 License.
6+
7+
This product may include a number of subcomponents with separate copyright notices and license terms. Your use of these subcomponents is subject to the terms and conditions of the subcomponent's license, as noted in the LICENSE file.
8+
*/
9+
10+
// Package v1alpha1 contains API Schema definitions for the rabbitmq.com v1alpha1 API group
11+
//+kubebuilder:object:generate=true
12+
//+groupName=rabbitmq.com
13+
package v1alpha1
14+
15+
import (
16+
"k8s.io/apimachinery/pkg/runtime/schema"
17+
"sigs.k8s.io/controller-runtime/pkg/scheme"
18+
)
19+
20+
var (
21+
// GroupVersion is group version used to register these objects
22+
GroupVersion = schema.GroupVersion{Group: "rabbitmq.com", Version: "v1alpha1"}
23+
24+
// SchemeGroupVersion is group version used to register these objects
25+
// added for generated clientset
26+
SchemeGroupVersion = GroupVersion
27+
28+
// SchemeBuilder is used to add go types to the GroupVersionKind scheme
29+
SchemeBuilder = &scheme.Builder{GroupVersion: GroupVersion}
30+
31+
// AddToScheme adds the types in this group-version to the given scheme.
32+
AddToScheme = SchemeBuilder.AddToScheme
33+
)
34+
35+
// Kind takes an unqualified kind and returns back a Group qualified GroupKind
36+
// added for generated clientset
37+
func Kind(kind string) schema.GroupKind {
38+
return GroupVersion.WithKind(kind).GroupKind()
39+
}
40+
41+
// Resource takes an unqualified resource and returns a Group qualified GroupResource
42+
// added for generated clientset
43+
func Resource(resource string) schema.GroupResource {
44+
return GroupVersion.WithResource(resource).GroupResource()
45+
}

api/v1alpha1/suite_test.go

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package v1alpha1
2+
3+
import (
4+
"path/filepath"
5+
"testing"
6+
7+
. "github.com/onsi/ginkgo"
8+
. "github.com/onsi/gomega"
9+
10+
"k8s.io/client-go/kubernetes/scheme"
11+
"k8s.io/client-go/rest"
12+
"sigs.k8s.io/controller-runtime/pkg/client"
13+
"sigs.k8s.io/controller-runtime/pkg/envtest"
14+
logf "sigs.k8s.io/controller-runtime/pkg/log"
15+
"sigs.k8s.io/controller-runtime/pkg/log/zap"
16+
)
17+
18+
var cfg *rest.Config
19+
var k8sClient client.Client
20+
var testEnv *envtest.Environment
21+
22+
func TestAPIs(t *testing.T) {
23+
RegisterFailHandler(Fail)
24+
RunSpecs(t, "v1alpha1 Suite")
25+
}
26+
27+
var _ = BeforeSuite(func() {
28+
29+
logf.SetLogger(zap.New(zap.UseDevMode(true), zap.WriteTo(GinkgoWriter)))
30+
31+
By("bootstrapping test environment")
32+
testEnv = &envtest.Environment{
33+
CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")},
34+
}
35+
36+
testEnv.ControlPlane.GetAPIServer().Configure().Set("bind-address", "127.0.0.1")
37+
38+
err := SchemeBuilder.AddToScheme(scheme.Scheme)
39+
Expect(err).NotTo(HaveOccurred())
40+
41+
cfg, err = testEnv.Start()
42+
Expect(err).ToNot(HaveOccurred())
43+
Expect(cfg).ToNot(BeNil())
44+
45+
k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme})
46+
Expect(err).ToNot(HaveOccurred())
47+
Expect(k8sClient).ToNot(BeNil())
48+
})
49+
50+
var _ = AfterSuite(func() {
51+
By("tearing down the test environment")
52+
err := testEnv.Stop()
53+
Expect(err).ToNot(HaveOccurred())
54+
})

api/v1alpha1/superstream_types.go

+82
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
RabbitMQ Messaging Topology Kubernetes Operator
3+
Copyright 2021 VMware, Inc.
4+
5+
This product is licensed to you under the Mozilla Public License 2.0 license (the "License"). You may not use this product except in compliance with the Mozilla 2.0 License.
6+
7+
This product may include a number of subcomponents with separate copyright notices and license terms. Your use of these subcomponents is subject to the terms and conditions of the subcomponent's license, as noted in the LICENSE file.
8+
*/
9+
10+
package v1alpha1
11+
12+
import (
13+
topologyv1beta1 "github.com/rabbitmq/messaging-topology-operator/api/v1beta1"
14+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15+
"k8s.io/apimachinery/pkg/runtime/schema"
16+
)
17+
18+
// SuperStreamSpec defines the desired state of SuperStream
19+
type SuperStreamSpec struct {
20+
// Name of the queue; required property.
21+
// +kubebuilder:validation:Required
22+
Name string `json:"name"`
23+
// Default to vhost '/'; cannot be updated
24+
// +kubebuilder:default:=/
25+
Vhost string `json:"vhost,omitempty"`
26+
// Number of partitions to create within this super stream.
27+
// Defaults to '3'.
28+
// +kubebuilder:default:=3
29+
Partitions int `json:"partitions,omitempty"`
30+
// Routing keys to use for each of the partitions in the SuperStream
31+
// If unset, the routing keys for the partitions will be set to the index of the partitions
32+
// +kubebuilder:validation:Optional
33+
RoutingKeys []string `json:"routingKeys,omitempty"`
34+
// Reference to the RabbitmqCluster that the SuperStream will be created in.
35+
// Required property.
36+
// +kubebuilder:validation:Required
37+
RabbitmqClusterReference topologyv1beta1.RabbitmqClusterReference `json:"rabbitmqClusterReference"`
38+
}
39+
40+
// SuperStreamStatus defines the observed state of SuperStream
41+
type SuperStreamStatus struct {
42+
// observedGeneration is the most recent successful generation observed for this SuperStream. It corresponds to the
43+
// SuperStream's generation, which is updated on mutation by the API Server.
44+
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
45+
Conditions []topologyv1beta1.Condition `json:"conditions,omitempty"`
46+
// Partitions are a list of the stream queue names which form the partitions of this SuperStream.
47+
Partitions []string `json:"partitions,omitempty"`
48+
}
49+
50+
// +genclient
51+
// +kubebuilder:object:root=true
52+
// +kubebuilder:resource:categories=all
53+
// +kubebuilder:subresource:status
54+
55+
// SuperStream is the Schema for the queues API
56+
type SuperStream struct {
57+
metav1.TypeMeta `json:",inline"`
58+
metav1.ObjectMeta `json:"metadata,omitempty"`
59+
60+
Spec SuperStreamSpec `json:"spec,omitempty"`
61+
Status SuperStreamStatus `json:"status,omitempty"`
62+
}
63+
64+
// +kubebuilder:object:root=true
65+
66+
// SuperStreamList contains a list of SuperStreams
67+
type SuperStreamList struct {
68+
metav1.TypeMeta `json:",inline"`
69+
metav1.ListMeta `json:"metadata,omitempty"`
70+
Items []SuperStream `json:"items"`
71+
}
72+
73+
func (q *SuperStream) GroupResource() schema.GroupResource {
74+
return schema.GroupResource{
75+
Group: q.GroupVersionKind().Group,
76+
Resource: q.GroupVersionKind().Kind,
77+
}
78+
}
79+
80+
func init() {
81+
SchemeBuilder.Register(&SuperStream{}, &SuperStreamList{})
82+
}
+82
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package v1alpha1
2+
3+
import (
4+
"context"
5+
. "github.com/onsi/ginkgo"
6+
. "github.com/onsi/gomega"
7+
topologyv1beta1 "github.com/rabbitmq/messaging-topology-operator/api/v1beta1"
8+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
"k8s.io/apimachinery/pkg/types"
10+
)
11+
12+
var _ = Describe("SuperStream spec", func() {
13+
var (
14+
namespace = "default"
15+
ctx = context.Background()
16+
)
17+
18+
It("creates a superstream with default settings", func() {
19+
expectedSpec := SuperStreamSpec{
20+
Name: "test-super-stream",
21+
Vhost: "/",
22+
Partitions: 3,
23+
RabbitmqClusterReference: topologyv1beta1.RabbitmqClusterReference{
24+
Name: "some-cluster",
25+
},
26+
}
27+
28+
superStream := SuperStream{
29+
ObjectMeta: metav1.ObjectMeta{
30+
Name: "test-super-stream",
31+
Namespace: namespace,
32+
},
33+
Spec: SuperStreamSpec{
34+
Name: "test-super-stream",
35+
RabbitmqClusterReference: topologyv1beta1.RabbitmqClusterReference{
36+
Name: "some-cluster",
37+
},
38+
},
39+
}
40+
Expect(k8sClient.Create(ctx, &superStream)).To(Succeed())
41+
fetchedSuperStream := &SuperStream{}
42+
Expect(k8sClient.Get(ctx, types.NamespacedName{
43+
Name: superStream.Name,
44+
Namespace: superStream.Namespace,
45+
}, fetchedSuperStream)).To(Succeed())
46+
Expect(fetchedSuperStream.Spec).To(Equal(expectedSpec))
47+
})
48+
49+
It("creates a superstream with specified settings", func() {
50+
expectedSpec := SuperStreamSpec{
51+
Name: "test-super-stream2",
52+
Vhost: "test-vhost",
53+
Partitions: 5,
54+
RabbitmqClusterReference: topologyv1beta1.RabbitmqClusterReference{
55+
Name: "some-cluster",
56+
},
57+
}
58+
59+
superStream := SuperStream{
60+
ObjectMeta: metav1.ObjectMeta{
61+
Name: "test-super-stream2",
62+
Namespace: namespace,
63+
},
64+
Spec: SuperStreamSpec{
65+
Name: "test-super-stream2",
66+
Vhost: "test-vhost",
67+
Partitions: 5,
68+
RabbitmqClusterReference: topologyv1beta1.RabbitmqClusterReference{
69+
Name: "some-cluster",
70+
},
71+
},
72+
}
73+
Expect(k8sClient.Create(ctx, &superStream)).To(Succeed())
74+
fetchedSuperStream := &SuperStream{}
75+
Expect(k8sClient.Get(ctx, types.NamespacedName{
76+
Name: superStream.Name,
77+
Namespace: superStream.Namespace,
78+
}, fetchedSuperStream)).To(Succeed())
79+
Expect(fetchedSuperStream.Spec).To(Equal(expectedSpec))
80+
})
81+
82+
})

api/v1alpha1/superstream_webhook.go

+87
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
RabbitMQ Messaging Topology Kubernetes Operator
3+
Copyright 2021 VMware, Inc.
4+
5+
This product is licensed to you under the Mozilla Public License 2.0 license (the "License"). You may not use this product except in compliance with the Mozilla 2.0 License.
6+
7+
This product may include a number of subcomponents with separate copyright notices and license terms. Your use of these subcomponents is subject to the terms and conditions of the subcomponent's license, as noted in the LICENSE file.
8+
*/
9+
10+
package v1alpha1
11+
12+
import (
13+
"fmt"
14+
apierrors "k8s.io/apimachinery/pkg/api/errors"
15+
"k8s.io/apimachinery/pkg/runtime"
16+
"k8s.io/apimachinery/pkg/util/validation/field"
17+
ctrl "sigs.k8s.io/controller-runtime"
18+
"sigs.k8s.io/controller-runtime/pkg/webhook"
19+
)
20+
21+
func (s *SuperStream) SetupWebhookWithManager(mgr ctrl.Manager) error {
22+
return ctrl.NewWebhookManagedBy(mgr).
23+
For(s).
24+
Complete()
25+
}
26+
27+
// +kubebuilder:webhook:verbs=create;update,path=/validate-rabbitmq-com-v1alpha1-superstream,mutating=false,failurePolicy=fail,groups=rabbitmq.com,resources=superstreams,versions=v1alpha1,name=vsuperstream.kb.io,sideEffects=none,admissionReviewVersions=v1
28+
29+
var _ webhook.Validator = &SuperStream{}
30+
31+
// no validation on create
32+
func (s *SuperStream) ValidateCreate() error {
33+
return nil
34+
}
35+
36+
// returns error type 'forbidden' for updates on superstream name, vhost and rabbitmqClusterReference
37+
func (s *SuperStream) ValidateUpdate(old runtime.Object) error {
38+
oldSuperStream, ok := old.(*SuperStream)
39+
if !ok {
40+
return apierrors.NewBadRequest(fmt.Sprintf("expected a superstream but got a %T", old))
41+
}
42+
43+
detailMsg := "updates on name, vhost and rabbitmqClusterReference are all forbidden"
44+
if s.Spec.Name != oldSuperStream.Spec.Name {
45+
return apierrors.NewForbidden(s.GroupResource(), s.Name,
46+
field.Forbidden(field.NewPath("spec", "name"), detailMsg))
47+
}
48+
if s.Spec.Vhost != oldSuperStream.Spec.Vhost {
49+
return apierrors.NewForbidden(s.GroupResource(), s.Name,
50+
field.Forbidden(field.NewPath("spec", "vhost"), detailMsg))
51+
}
52+
53+
if s.Spec.RabbitmqClusterReference != oldSuperStream.Spec.RabbitmqClusterReference {
54+
return apierrors.NewForbidden(s.GroupResource(), s.Name,
55+
field.Forbidden(field.NewPath("spec", "rabbitmqClusterReference"), detailMsg))
56+
}
57+
58+
if !routingKeyUpdatePermitted(oldSuperStream.Spec.RoutingKeys, s.Spec.RoutingKeys) {
59+
return apierrors.NewForbidden(s.GroupResource(), s.Name,
60+
field.Forbidden(field.NewPath("spec", "routingKeys"), "updates may only add to the existing list of routing keys"))
61+
}
62+
63+
if s.Spec.Partitions < oldSuperStream.Spec.Partitions {
64+
return apierrors.NewForbidden(s.GroupResource(), s.Name,
65+
field.Forbidden(field.NewPath("spec", "partitions"), "updates may only increase the partition count, and may not decrease it"))
66+
}
67+
68+
return nil
69+
}
70+
71+
// ValidateDelete no validation on delete
72+
func (s *SuperStream) ValidateDelete() error {
73+
return nil
74+
}
75+
76+
// routingKeyUpdatePermitted allows updates only if adding additional keys at the end of the list of keys
77+
func routingKeyUpdatePermitted(old, new []string) bool {
78+
if len(old) == 0 && len(new) != 0 {
79+
return false
80+
}
81+
for i := 0; i < len(old); i++ {
82+
if old[i] != new[i] {
83+
return false
84+
}
85+
}
86+
return true
87+
}

0 commit comments

Comments
 (0)