Skip to content

Commit 0c4d537

Browse files
authored
Add statefulset partition controller (#633)
* Add statefulset Partition controller Signed-off-by: d-kuro <[email protected]> * Fix force rolling update annotation test case. Signed-off-by: d-kuro <[email protected]> * Fix check partition logic. Signed-off-by: d-kuro <[email protected]> * Give partition in create. Signed-off-by: d-kuro <[email protected]> * Addressed review comments. Signed-off-by: d-kuro <[email protected]> * Add pvc template update e2e tests. Signed-off-by: d-kuro <[email protected]> * Remove unnecessary new line. * Fix comments Signed-off-by: d-kuro <[email protected]> * Fix log fields related to Pod Signed-off-by: d-kuro <[email protected]> * Add e2e for should not start rollout Signed-off-by: d-kuro <[email protected]> --------- Signed-off-by: d-kuro <[email protected]>
1 parent 98db868 commit 0c4d537

25 files changed

+1609
-2
lines changed

api/v1beta2/statefulset_webhhok.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package v1beta2
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/cybozu-go/moco/pkg/constants"
8+
admissionv1 "k8s.io/api/admission/v1"
9+
appsv1 "k8s.io/api/apps/v1"
10+
"k8s.io/apimachinery/pkg/api/equality"
11+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
12+
"k8s.io/apimachinery/pkg/runtime"
13+
"k8s.io/utils/ptr"
14+
ctrl "sigs.k8s.io/controller-runtime"
15+
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
16+
)
17+
18+
func SetupStatefulSetWebhookWithManager(mgr ctrl.Manager) error {
19+
return ctrl.NewWebhookManagedBy(mgr).
20+
For(&appsv1.StatefulSet{}).
21+
WithDefaulter(&StatefulSetDefaulter{}).
22+
Complete()
23+
}
24+
25+
//+kubebuilder:webhook:path=/mutate-apps-v1-statefulset,mutating=true,failurePolicy=fail,sideEffects=None,groups=apps,resources=statefulsets,verbs=create;update,versions=v1,name=statefulset.kb.io,admissionReviewVersions=v1
26+
27+
type StatefulSetDefaulter struct{}
28+
29+
var _ admission.CustomDefaulter = &StatefulSetDefaulter{}
30+
31+
// Default implements webhook.Defaulter so a webhook will be registered for the type
32+
func (*StatefulSetDefaulter) Default(ctx context.Context, obj runtime.Object) error {
33+
sts, ok := obj.(*appsv1.StatefulSet)
34+
if !ok {
35+
return fmt.Errorf("unknown obj type %T", obj)
36+
}
37+
38+
req, err := admission.RequestFromContext(ctx)
39+
if err != nil {
40+
return fmt.Errorf("failed to get admission request from context: %w", err)
41+
}
42+
43+
if req.Operation != admissionv1.Update && req.Operation != admissionv1.Create {
44+
return nil
45+
}
46+
47+
if len(sts.OwnerReferences) != 1 {
48+
return nil
49+
}
50+
51+
if sts.OwnerReferences[0].Kind != "MySQLCluster" && sts.OwnerReferences[0].APIVersion != GroupVersion.String() {
52+
return nil
53+
}
54+
55+
if sts.Annotations[constants.AnnForceRollingUpdate] == "true" {
56+
sts.Spec.UpdateStrategy.RollingUpdate = nil
57+
return nil
58+
}
59+
60+
if sts.Spec.UpdateStrategy.RollingUpdate == nil || sts.Spec.UpdateStrategy.RollingUpdate.Partition == nil {
61+
sts.Spec.UpdateStrategy.RollingUpdate = &appsv1.RollingUpdateStatefulSetStrategy{
62+
Partition: ptr.To[int32](*sts.Spec.Replicas),
63+
}
64+
return nil
65+
}
66+
67+
if req.OldObject.Raw == nil {
68+
return nil
69+
}
70+
71+
oldSts, err := readStatefulSet(req.OldObject.Raw)
72+
if err != nil {
73+
return fmt.Errorf("failed to read old statefulset: %w", err)
74+
}
75+
76+
partition := *sts.Spec.UpdateStrategy.RollingUpdate.Partition
77+
oldPartition := *oldSts.Spec.UpdateStrategy.RollingUpdate.Partition
78+
79+
newSts := sts.DeepCopy()
80+
newSts.Spec.UpdateStrategy = oldSts.Spec.UpdateStrategy
81+
82+
if partition != oldPartition && equality.Semantic.DeepEqual(newSts.Spec, oldSts.Spec) {
83+
return nil
84+
}
85+
86+
sts.Spec.UpdateStrategy.RollingUpdate = &appsv1.RollingUpdateStatefulSetStrategy{
87+
Partition: ptr.To[int32](*sts.Spec.Replicas),
88+
}
89+
90+
return nil
91+
}
92+
93+
func readStatefulSet(raw []byte) (*appsv1.StatefulSet, error) {
94+
var sts appsv1.StatefulSet
95+
96+
if _, _, err := unstructured.UnstructuredJSONScheme.Decode(raw, nil, &sts); err != nil {
97+
return nil, err
98+
}
99+
100+
sts.TypeMeta.APIVersion = appsv1.SchemeGroupVersion.Group + "/" + appsv1.SchemeGroupVersion.Version
101+
102+
return &sts, nil
103+
}
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
package v1beta2_test
2+
3+
import (
4+
"context"
5+
6+
. "github.com/onsi/ginkgo"
7+
. "github.com/onsi/gomega"
8+
appsv1 "k8s.io/api/apps/v1"
9+
corev1 "k8s.io/api/core/v1"
10+
apierrors "k8s.io/apimachinery/pkg/api/errors"
11+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12+
"k8s.io/utils/ptr"
13+
"sigs.k8s.io/controller-runtime/pkg/client"
14+
15+
"github.com/cybozu-go/moco/pkg/constants"
16+
)
17+
18+
func makeStatefulSet() *appsv1.StatefulSet {
19+
return &appsv1.StatefulSet{
20+
ObjectMeta: metav1.ObjectMeta{
21+
Name: "test",
22+
Namespace: "default",
23+
OwnerReferences: []metav1.OwnerReference{
24+
{
25+
APIVersion: "moco.cybozu.com/v1beta2",
26+
Kind: "MySQLCluster",
27+
Name: "test",
28+
UID: "uid",
29+
},
30+
},
31+
},
32+
Spec: appsv1.StatefulSetSpec{
33+
Replicas: ptr.To[int32](3),
34+
Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}},
35+
UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
36+
Type: appsv1.RollingUpdateStatefulSetStrategyType,
37+
},
38+
Template: corev1.PodTemplateSpec{
39+
ObjectMeta: metav1.ObjectMeta{
40+
Labels: map[string]string{"foo": "bar"},
41+
},
42+
Spec: corev1.PodSpec{
43+
Containers: []corev1.Container{
44+
{
45+
Name: "mysql",
46+
Image: "mysql:examle",
47+
},
48+
},
49+
},
50+
},
51+
},
52+
}
53+
}
54+
55+
func deleteStatefulSet() error {
56+
r := &appsv1.StatefulSet{}
57+
err := k8sClient.Get(ctx, client.ObjectKey{Namespace: "default", Name: "test"}, r)
58+
if apierrors.IsNotFound(err) {
59+
return nil
60+
}
61+
if err != nil {
62+
return err
63+
}
64+
65+
r.Finalizers = nil
66+
if err := k8sClient.Update(ctx, r); err != nil {
67+
return err
68+
}
69+
70+
if err := k8sClient.Delete(ctx, r); err != nil {
71+
return err
72+
}
73+
74+
return nil
75+
}
76+
77+
var _ = Describe("StatefulSet Webhook", func() {
78+
ctx := context.TODO()
79+
80+
BeforeEach(func() {
81+
err := deleteStatefulSet()
82+
Expect(err).NotTo(HaveOccurred())
83+
})
84+
85+
It("should set partition when creating StatefulSet", func() {
86+
r := makeStatefulSet()
87+
err := k8sClient.Create(ctx, r)
88+
Expect(err).NotTo(HaveOccurred())
89+
Expect(r.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(r.Spec.Replicas))
90+
})
91+
92+
It("should set partition when updating StatefulSet", func() {
93+
r := makeStatefulSet()
94+
r.Spec.UpdateStrategy.RollingUpdate = &appsv1.RollingUpdateStatefulSetStrategy{
95+
Partition: ptr.To[int32](2),
96+
}
97+
err := k8sClient.Create(ctx, r)
98+
Expect(err).NotTo(HaveOccurred())
99+
100+
err = k8sClient.Update(ctx, r)
101+
Expect(err).NotTo(HaveOccurred())
102+
Expect(r.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(r.Spec.Replicas))
103+
})
104+
105+
It("should not set partition when forcing updating StatefulSet", func() {
106+
r := makeStatefulSet()
107+
err := k8sClient.Create(ctx, r)
108+
Expect(err).NotTo(HaveOccurred())
109+
110+
err = k8sClient.Update(ctx, r)
111+
Expect(err).NotTo(HaveOccurred())
112+
Expect(r.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(r.Spec.Replicas))
113+
114+
r.Annotations = map[string]string{constants.AnnForceRollingUpdate: "true"}
115+
err = k8sClient.Update(ctx, r)
116+
Expect(err).NotTo(HaveOccurred())
117+
Expect(r.Spec.UpdateStrategy.RollingUpdate).To(BeNil())
118+
})
119+
120+
It("should set partition when forcing updating StatefulSet with invalid value", func() {
121+
r := makeStatefulSet()
122+
err := k8sClient.Create(ctx, r)
123+
Expect(err).NotTo(HaveOccurred())
124+
125+
r.Annotations = map[string]string{constants.AnnForceRollingUpdate: "false"}
126+
err = k8sClient.Update(ctx, r)
127+
Expect(err).NotTo(HaveOccurred())
128+
Expect(r.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(r.Spec.Replicas))
129+
})
130+
131+
It("should not update partition when updating StatefulSet with only partition changed", func() {
132+
r := makeStatefulSet()
133+
err := k8sClient.Create(ctx, r)
134+
Expect(err).NotTo(HaveOccurred())
135+
136+
err = k8sClient.Update(ctx, r)
137+
Expect(err).NotTo(HaveOccurred())
138+
Expect(r.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(r.Spec.Replicas))
139+
140+
r.Spec.UpdateStrategy.RollingUpdate.Partition = ptr.To[int32](2)
141+
err = k8sClient.Update(ctx, r)
142+
Expect(err).NotTo(HaveOccurred())
143+
Expect(r.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(ptr.To[int32](2)))
144+
})
145+
146+
It("should update partition when updating StatefulSet with partition and same field changed", func() {
147+
r := makeStatefulSet()
148+
err := k8sClient.Create(ctx, r)
149+
Expect(err).NotTo(HaveOccurred())
150+
151+
err = k8sClient.Update(ctx, r)
152+
Expect(err).NotTo(HaveOccurred())
153+
Expect(r.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(r.Spec.Replicas))
154+
155+
r.Spec.Replicas = ptr.To[int32](5)
156+
r.Spec.UpdateStrategy.RollingUpdate.Partition = ptr.To[int32](2)
157+
err = k8sClient.Update(ctx, r)
158+
Expect(err).NotTo(HaveOccurred())
159+
Expect(r.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(ptr.To[int32](5)))
160+
})
161+
162+
It("should update partition when updating StatefulSet with partition unchanged", func() {
163+
r := makeStatefulSet()
164+
err := k8sClient.Create(ctx, r)
165+
Expect(err).NotTo(HaveOccurred())
166+
167+
err = k8sClient.Update(ctx, r)
168+
Expect(err).NotTo(HaveOccurred())
169+
Expect(r.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(r.Spec.Replicas))
170+
171+
r.Spec.Replicas = ptr.To[int32](5)
172+
err = k8sClient.Update(ctx, r)
173+
Expect(err).NotTo(HaveOccurred())
174+
Expect(r.Spec.UpdateStrategy.RollingUpdate.Partition).To(Equal(ptr.To[int32](5)))
175+
})
176+
})

api/v1beta2/webhook_suite_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ var _ = BeforeSuite(func() {
9898
Expect(err).NotTo(HaveOccurred())
9999
err = (&mocov1beta2.BackupPolicy{}).SetupWebhookWithManager(mgr)
100100
Expect(err).NotTo(HaveOccurred())
101+
err = mocov1beta2.SetupStatefulSetWebhookWithManager(mgr)
102+
Expect(err).NotTo(HaveOccurred())
101103

102104
//+kubebuilder:scaffold:webhook
103105

api/v1beta2/zz_generated.deepcopy.go

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

charts/moco/templates/generated/generated.yaml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,12 @@ rules:
155155
- patch
156156
- update
157157
- watch
158+
- apiGroups:
159+
- ""
160+
resources:
161+
- pods/status
162+
verbs:
163+
- get
158164
- apiGroups:
159165
- ""
160166
resources:
@@ -460,6 +466,26 @@ webhooks:
460466
resources:
461467
- mysqlclusters
462468
sideEffects: None
469+
- admissionReviewVersions:
470+
- v1
471+
clientConfig:
472+
service:
473+
name: moco-webhook-service
474+
namespace: '{{ .Release.Namespace }}'
475+
path: /mutate-apps-v1-statefulset
476+
failurePolicy: Fail
477+
name: statefulset.kb.io
478+
rules:
479+
- apiGroups:
480+
- apps
481+
apiVersions:
482+
- v1
483+
operations:
484+
- CREATE
485+
- UPDATE
486+
resources:
487+
- statefulsets
488+
sideEffects: None
463489
---
464490
apiVersion: admissionregistration.k8s.io/v1
465491
kind: ValidatingWebhookConfiguration

cmd/moco-controller/cmd/run.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,15 @@ func subMain(ns, addr string, port int) error {
119119
return err
120120
}
121121

122+
if err = (&controllers.StatefulSetPartitionReconciler{
123+
Client: mgr.GetClient(),
124+
Recorder: mgr.GetEventRecorderFor("moco-controller"),
125+
MaxConcurrentReconciles: config.maxConcurrentReconciles,
126+
}).SetupWithManager(mgr); err != nil {
127+
setupLog.Error(err, "unable to create controller", "controller", "Partition")
128+
return err
129+
}
130+
122131
if err = (&controllers.PodWatcher{
123132
Client: mgr.GetClient(),
124133
ClusterManager: clusterMgr,
@@ -138,6 +147,11 @@ func subMain(ns, addr string, port int) error {
138147
return err
139148
}
140149

150+
if err := mocov1beta2.SetupStatefulSetWebhookWithManager(mgr); err != nil {
151+
setupLog.Error(err, "unable to setup webhook", "webhook", "StatefulSet")
152+
return err
153+
}
154+
141155
if err := mgr.AddHealthzCheck("health", healthz.Ping); err != nil {
142156
setupLog.Error(err, "unable to set up health check")
143157
return err

config/rbac/role.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ rules:
5050
- patch
5151
- update
5252
- watch
53+
- apiGroups:
54+
- ""
55+
resources:
56+
- pods/status
57+
verbs:
58+
- get
5359
- apiGroups:
5460
- ""
5561
resources:

0 commit comments

Comments
 (0)