@@ -19,18 +19,26 @@ package pod
1919import (
2020 "context"
2121 "encoding/json"
22+ "fmt"
2223 "net/http"
2324
2425 admissionv1 "k8s.io/api/admission/v1"
26+ appsv1 "k8s.io/api/apps/v1"
2527 v1 "k8s.io/api/core/v1"
28+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29+ "k8s.io/apimachinery/pkg/runtime"
30+ "k8s.io/apimachinery/pkg/runtime/schema"
31+ "k8s.io/apimachinery/pkg/util/strategicpatch"
2632 kubeclientset "k8s.io/client-go/kubernetes"
33+ "k8s.io/client-go/kubernetes/scheme"
2734 "k8s.io/klog/v2"
2835 ctrl "sigs.k8s.io/controller-runtime"
2936 "sigs.k8s.io/controller-runtime/pkg/client"
3037 "sigs.k8s.io/controller-runtime/pkg/runtime/inject"
3138 "sigs.k8s.io/controller-runtime/pkg/webhook/admission"
3239
3340 "github.com/KusionStack/controller-mesh/pkg/apis/ctrlmesh"
41+ "github.com/KusionStack/controller-mesh/pkg/manager/controllers/rollout"
3442)
3543
3644type MutatingHandler struct {
@@ -90,3 +98,89 @@ func (h *MutatingHandler) InjectDecoder(d *admission.Decoder) error {
9098 h .Decoder = d
9199 return nil
92100}
101+
102+ func (h * MutatingHandler ) revisionRollOut (ctx context.Context , pod * v1.Pod ) (err error ) {
103+ podRevision := pod .Labels [appsv1 .ControllerRevisionHashLabelKey ]
104+ sts := & appsv1.StatefulSet {}
105+ if pod .OwnerReferences == nil || len (pod .OwnerReferences ) == 0 {
106+ return fmt .Errorf ("illegal owner reference" )
107+ }
108+ if pod .OwnerReferences [0 ].Kind != "StatefulSet" {
109+ return fmt .Errorf ("illegal owner reference kind %s" , pod .OwnerReferences [0 ].Kind )
110+ }
111+
112+ sts , err = h .directKubeClient .AppsV1 ().StatefulSets (pod .Namespace ).Get (ctx , pod .OwnerReferences [0 ].Name , metav1.GetOptions {})
113+ if err != nil {
114+ klog .Error (err )
115+ return err
116+ }
117+ if sts .Spec .UpdateStrategy .Type != appsv1 .OnDeleteStatefulSetStrategyType {
118+ return nil
119+ }
120+ expectState := rollout .GetExpectedRevision (sts )
121+ if expectState .UpdateRevision == "" || expectState .PodRevision == nil || expectState .PodRevision [pod .Name ] == "" {
122+ return
123+ }
124+ expectedRevision := expectState .PodRevision [pod .Name ]
125+ if expectedRevision == podRevision {
126+ return
127+ }
128+
129+ h .directKubeClient .AppsV1 ().ControllerRevisions (pod .Namespace ).Get (ctx , pod .OwnerReferences [0 ].Name , metav1.GetOptions {})
130+
131+ expectRevision , err := h .directKubeClient .AppsV1 ().ControllerRevisions (pod .Namespace ).Get (ctx , expectedRevision , metav1.GetOptions {})
132+ if err != nil {
133+ return fmt .Errorf ("cannot find old ControllerRevision %s" , expectedRevision )
134+ }
135+
136+ createRevision , err := h .directKubeClient .AppsV1 ().ControllerRevisions (pod .Namespace ).Get (ctx , podRevision , metav1.GetOptions {})
137+ if err != nil {
138+ return fmt .Errorf ("cannot find ControllerRevision %s by pod %s/%s" , podRevision , pod .Namespace , pod .Name )
139+ }
140+
141+ expectedSts := & appsv1.StatefulSet {}
142+ createdSts := & appsv1.StatefulSet {}
143+
144+ applyPatch (expectedSts , & expectRevision .Data .Raw )
145+ applyPatch (createdSts , & createRevision .Data .Raw )
146+
147+ expectedPo := & v1.Pod {
148+ Spec : expectedSts .Spec .Template .Spec ,
149+ }
150+ createdPo := & v1.Pod {
151+ Spec : createdSts .Spec .Template .Spec ,
152+ }
153+
154+ expectedBt , _ := runtime .Encode (patchCodec , expectedPo )
155+ createdBt , _ := runtime .Encode (patchCodec , createdPo )
156+ currentBt , _ := runtime .Encode (patchCodec , pod )
157+
158+ patch , err := strategicpatch .CreateTwoWayMergePatch (createdBt , expectedBt , expectedPo )
159+ if err != nil {
160+ return err
161+ }
162+ originBt , err := strategicpatch .StrategicMergePatch (currentBt , patch , pod )
163+ if err != nil {
164+ return err
165+ }
166+ newPod := & v1.Pod {}
167+ if err = json .Unmarshal (originBt , newPod ); err != nil {
168+ return err
169+ }
170+ pod .Spec = newPod .Spec
171+ pod .Labels [appsv1 .ControllerRevisionHashLabelKey ] = expectedRevision
172+ return
173+ }
174+
175+ var patchCodec = scheme .Codecs .LegacyCodec (schema.GroupVersion {Group : "apps" , Version : "v1" }, schema.GroupVersion {Version : "v1" })
176+
177+ func applyPatch (target runtime.Object , podPatch * []byte ) error {
178+ patched , err := strategicpatch .StrategicMergePatch ([]byte (runtime .EncodeOrDie (patchCodec , target )), * podPatch , target )
179+ if err != nil {
180+ return err
181+ }
182+ if err = json .Unmarshal (patched , target ); err != nil {
183+ return err
184+ }
185+ return nil
186+ }
0 commit comments