@@ -19,18 +19,26 @@ package pod
1919import (
2020 "context"
2121 "encoding/json"
22+ "fmt"
2223 "net/http"
2324
2425 admissionv1 "k8s.io/api/admission/v1"
26+ appsv1 "k8s.io/api/apps/v1"
2527 v1 "k8s.io/api/core/v1"
28+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29+ "k8s.io/apimachinery/pkg/runtime"
30+ "k8s.io/apimachinery/pkg/runtime/schema"
31+ "k8s.io/apimachinery/pkg/util/strategicpatch"
2632 kubeclientset "k8s.io/client-go/kubernetes"
33+ "k8s.io/client-go/kubernetes/scheme"
2734 "k8s.io/klog/v2"
2835 ctrl "sigs.k8s.io/controller-runtime"
2936 "sigs.k8s.io/controller-runtime/pkg/client"
3037 "sigs.k8s.io/controller-runtime/pkg/runtime/inject"
3138 "sigs.k8s.io/controller-runtime/pkg/webhook/admission"
3239
3340 "github.com/KusionStack/controller-mesh/pkg/apis/ctrlmesh"
41+ "github.com/KusionStack/controller-mesh/pkg/manager/controllers/rollout"
3442)
3543
3644type MutatingHandler struct {
@@ -90,3 +98,87 @@ func (h *MutatingHandler) InjectDecoder(d *admission.Decoder) error {
9098 h .Decoder = d
9199 return nil
92100}
101+
102+ func (h * MutatingHandler ) revisionRollOut (ctx context.Context , pod * v1.Pod ) (err error ) {
103+ podRevision := pod .Labels [appsv1 .ControllerRevisionHashLabelKey ]
104+ sts := & appsv1.StatefulSet {}
105+ if pod .OwnerReferences == nil || len (pod .OwnerReferences ) == 0 {
106+ return fmt .Errorf ("illegal owner reference" )
107+ }
108+ if pod .OwnerReferences [0 ].Kind != "StatefulSet" {
109+ return fmt .Errorf ("illegal owner reference kind %s" , pod .OwnerReferences [0 ].Kind )
110+ }
111+
112+ sts , err = h .directKubeClient .AppsV1 ().StatefulSets (pod .Namespace ).Get (ctx , pod .OwnerReferences [0 ].Name , metav1.GetOptions {})
113+ if err != nil {
114+ klog .Error (err )
115+ return err
116+ }
117+ if sts .Spec .UpdateStrategy .Type != appsv1 .OnDeleteStatefulSetStrategyType {
118+ return nil
119+ }
120+ expectState := rollout .GetExpectedRevision (sts )
121+ if expectState .UpdateRevision == "" || expectState .PodRevision == nil || expectState .PodRevision [pod .Name ] == "" {
122+ return
123+ }
124+ expectedRevision := expectState .PodRevision [pod .Name ]
125+ if expectedRevision == podRevision {
126+ return
127+ }
128+ // Do not use manager client get ControllerRevision. (To avoid Informer cache)
129+ expectRevision , err := h .directKubeClient .AppsV1 ().ControllerRevisions (pod .Namespace ).Get (ctx , expectedRevision , metav1.GetOptions {})
130+ if err != nil {
131+ return fmt .Errorf ("cannot find old ControllerRevision %s" , expectedRevision )
132+ }
133+
134+ createRevision , err := h .directKubeClient .AppsV1 ().ControllerRevisions (pod .Namespace ).Get (ctx , podRevision , metav1.GetOptions {})
135+ if err != nil {
136+ return fmt .Errorf ("cannot find ControllerRevision %s by pod %s/%s" , podRevision , pod .Namespace , pod .Name )
137+ }
138+
139+ expectedSts := & appsv1.StatefulSet {}
140+ createdSts := & appsv1.StatefulSet {}
141+
142+ applyPatch (expectedSts , & expectRevision .Data .Raw )
143+ applyPatch (createdSts , & createRevision .Data .Raw )
144+
145+ expectedPo := & v1.Pod {
146+ Spec : expectedSts .Spec .Template .Spec ,
147+ }
148+ createdPo := & v1.Pod {
149+ Spec : createdSts .Spec .Template .Spec ,
150+ }
151+
152+ expectedBt , _ := runtime .Encode (patchCodec , expectedPo )
153+ createdBt , _ := runtime .Encode (patchCodec , createdPo )
154+ currentBt , _ := runtime .Encode (patchCodec , pod )
155+
156+ patch , err := strategicpatch .CreateTwoWayMergePatch (createdBt , expectedBt , expectedPo )
157+ if err != nil {
158+ return err
159+ }
160+ originBt , err := strategicpatch .StrategicMergePatch (currentBt , patch , pod )
161+ if err != nil {
162+ return err
163+ }
164+ newPod := & v1.Pod {}
165+ if err = json .Unmarshal (originBt , newPod ); err != nil {
166+ return err
167+ }
168+ pod .Spec = newPod .Spec
169+ pod .Labels [appsv1 .ControllerRevisionHashLabelKey ] = expectedRevision
170+ return
171+ }
172+
173+ var patchCodec = scheme .Codecs .LegacyCodec (schema.GroupVersion {Group : "apps" , Version : "v1" }, schema.GroupVersion {Version : "v1" })
174+
175+ func applyPatch (target runtime.Object , podPatch * []byte ) error {
176+ patched , err := strategicpatch .StrategicMergePatch ([]byte (runtime .EncodeOrDie (patchCodec , target )), * podPatch , target )
177+ if err != nil {
178+ return err
179+ }
180+ if err = json .Unmarshal (patched , target ); err != nil {
181+ return err
182+ }
183+ return nil
184+ }
0 commit comments