From 1a96198340eacaa773a05380e057abffd3e0d750 Mon Sep 17 00:00:00 2001 From: Eikykun Date: Fri, 15 Dec 2023 17:26:08 +0800 Subject: [PATCH 1/6] fix podTransitionRule webhook interval and rename PodTransitionRule shortName --- apis/apps/v1alpha1/podtransitionrule_types.go | 2 +- .../bases/apps.kusionstack.io_podtransitionrules.yaml | 2 +- .../podtransitionrule/processor/processor.go | 2 +- .../podtransitionrule/processor/rules/webhook.go | 10 ++++++---- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/apis/apps/v1alpha1/podtransitionrule_types.go b/apis/apps/v1alpha1/podtransitionrule_types.go index e50af808..d4208230 100644 --- a/apis/apps/v1alpha1/podtransitionrule_types.go +++ b/apis/apps/v1alpha1/podtransitionrule_types.go @@ -231,7 +231,7 @@ type RejectInfo struct { // +k8s:openapi-gen=true // +kubebuilder:object:root=true // +kubebuilder:subresource:status -// +kubebuilder:resource:shortName=rs +// +kubebuilder:resource:shortName=ptr // PodTransitionRule is the Schema for the podtransitionrules API type PodTransitionRule struct { diff --git a/config/crd/bases/apps.kusionstack.io_podtransitionrules.yaml b/config/crd/bases/apps.kusionstack.io_podtransitionrules.yaml index b8decfd6..d3cc4495 100644 --- a/config/crd/bases/apps.kusionstack.io_podtransitionrules.yaml +++ b/config/crd/bases/apps.kusionstack.io_podtransitionrules.yaml @@ -13,7 +13,7 @@ spec: listKind: PodTransitionRuleList plural: podtransitionrules shortNames: - - rs + - ptr singular: podtransitionrule scope: Namespaced versions: diff --git a/pkg/controllers/podtransitionrule/processor/processor.go b/pkg/controllers/podtransitionrule/processor/processor.go index 790ded4b..f155961d 100644 --- a/pkg/controllers/podtransitionrule/processor/processor.go +++ b/pkg/controllers/podtransitionrule/processor/processor.go @@ -173,7 +173,7 @@ func (p *Processor) Process(targets map[string]*corev1.Pod) *ProcessResult { RuleStates: ruleStates, } - if minInterval != time.Duration(math.MaxInt32) { + if minInterval != time.Duration(math.MaxInt32)*time.Second { res.Interval = &minInterval } return res diff --git a/pkg/controllers/podtransitionrule/processor/rules/webhook.go b/pkg/controllers/podtransitionrule/processor/rules/webhook.go index 67facf0c..7f69781a 100644 --- a/pkg/controllers/podtransitionrule/processor/rules/webhook.go +++ b/pkg/controllers/podtransitionrule/processor/rules/webhook.go @@ -212,15 +212,14 @@ func (w *Webhook) Do(targets map[string]*corev1.Pod, subjects sets.String) *Filt return hasChecked }, taskId) w.recordTimeOld(taskId) - w.updateInterval(wait / time.Second) + w.updateInterval(wait) continue } res, err := w.polling(taskId) - w.recordTime(taskId, res.Message, false) - if err != nil { + w.recordTime(taskId, err.Error(), false) newWebhookState.ItemStatus = appendStatus(newWebhookState.ItemStatus, pods, func(po string) bool { hasChecked := checked.Has(po) if !hasChecked { @@ -230,6 +229,7 @@ func (w *Webhook) Do(targets map[string]*corev1.Pod, subjects sets.String) *Filt }, taskId) continue } + w.recordTime(taskId, res.Message, false) // all failed if !res.Success { @@ -251,7 +251,9 @@ func (w *Webhook) Do(targets map[string]*corev1.Pod, subjects sets.String) *Filt }, taskId) continue } - + if w.Webhook.ClientConfig.Poll.IntervalSeconds != nil { + w.updateInterval(time.Duration(*w.Webhook.ClientConfig.Poll.IntervalSeconds) * time.Second) + } localFinished := sets.NewString(res.FinishedNames...) // query trace if !res.Stop { From 62dfed0ba50b9dbca6b92f1e2336f6ef21134439 Mon Sep 17 00:00:00 2001 From: Eikykun Date: Mon, 25 Dec 2023 15:42:30 +0800 Subject: [PATCH 2/6] enhance: PT webhook polling --- apis/apps/v1alpha1/podtransitionrule_types.go | 4 + .../podtransitionrule/eventhandler.go | 7 + .../podtransitionrule_controller.go | 14 +- .../podtransitionrule_controller_test.go | 198 ++++++++++++ .../processor/rules/available.go | 4 +- .../processor/rules/polling.go | 277 +++++++++++++++++ .../processor/rules/types.go | 2 +- .../processor/rules/webhook.go | 282 +++++++----------- .../processor/rules/webhook_test.go | 4 +- .../podtransitionrule/utils/check.go | 9 +- 10 files changed, 610 insertions(+), 191 deletions(-) create mode 100644 pkg/controllers/podtransitionrule/processor/rules/polling.go diff --git a/apis/apps/v1alpha1/podtransitionrule_types.go b/apis/apps/v1alpha1/podtransitionrule_types.go index d4208230..7f6f04bb 100644 --- a/apis/apps/v1alpha1/podtransitionrule_types.go +++ b/apis/apps/v1alpha1/podtransitionrule_types.go @@ -192,6 +192,10 @@ type WebhookStatus struct { type TaskInfo struct { TaskId string `json:"taskId,omitempty"` + //Processing []string `json:"targets,omitempty"` + + //Approved []string `json:"approved,omitempty"` + BeginTime *metav1.Time `json:"beginTime,omitempty"` LastTime *metav1.Time `json:"lastTime,omitempty"` diff --git a/pkg/controllers/podtransitionrule/eventhandler.go b/pkg/controllers/podtransitionrule/eventhandler.go index b0384a79..d6b31d9b 100644 --- a/pkg/controllers/podtransitionrule/eventhandler.go +++ b/pkg/controllers/podtransitionrule/eventhandler.go @@ -31,12 +31,19 @@ import ( "sigs.k8s.io/controller-runtime/pkg/runtime/inject" appsv1alpha1 "kusionstack.io/operating/apis/apps/v1alpha1" + processorrules "kusionstack.io/operating/pkg/controllers/podtransitionrule/processor/rules" commonutils "kusionstack.io/operating/pkg/utils" ) var _ inject.Client = &EventHandler{} var _ inject.Logger = &EventHandler{} +func NewWebhookGenericEventChannel() <-chan event.GenericEvent { + webhookTriggerChannel := make(chan event.GenericEvent, 1<<10) + processorrules.PollingManager.AddListener(webhookTriggerChannel) + return webhookTriggerChannel +} + type EventHandler struct { // client and logger will be injected client client.Client diff --git a/pkg/controllers/podtransitionrule/podtransitionrule_controller.go b/pkg/controllers/podtransitionrule/podtransitionrule_controller.go index 5260c4d4..d417b990 100644 --- a/pkg/controllers/podtransitionrule/podtransitionrule_controller.go +++ b/pkg/controllers/podtransitionrule/podtransitionrule_controller.go @@ -33,6 +33,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" @@ -82,6 +83,10 @@ func addToMgr(mgr manager.Manager, r reconcile.Reconciler) (controller.Controlle return c, err } + err = c.Watch(&source.Channel{Source: NewWebhookGenericEventChannel()}, &handler.EnqueueRequestForObject{}) + if err != nil { + return c, err + } return c, nil } @@ -331,8 +336,15 @@ func updateDetail(details map[string]*appsv1alpha1.PodTransitionDetail, passRule } func equalStatus(updated *appsv1alpha1.PodTransitionRuleStatus, current *appsv1alpha1.PodTransitionRuleStatus) bool { - return equality.Semantic.DeepEqual(updated.Targets, current.Targets) && + deepEqual := equality.Semantic.DeepEqual(updated.Targets, current.Targets) && equality.Semantic.DeepEqual(updated.Details, current.Details) && equality.Semantic.DeepEqual(updated.RuleStates, current.RuleStates) && updated.ObservedGeneration == current.ObservedGeneration + if !deepEqual { + fmt.Println("test: update pd status ") + fmt.Println(utils.DumpJSON(current)) + fmt.Println(utils.DumpJSON(updated)) + return utils.DumpJSON(updated) == utils.DumpJSON(current) + } + return deepEqual } diff --git a/pkg/controllers/podtransitionrule/podtransitionrule_controller_test.go b/pkg/controllers/podtransitionrule/podtransitionrule_controller_test.go index 0da25488..21a2d56d 100644 --- a/pkg/controllers/podtransitionrule/podtransitionrule_controller_test.go +++ b/pkg/controllers/podtransitionrule/podtransitionrule_controller_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/google/uuid" "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -305,6 +306,115 @@ func TestWebhookRule(t *testing.T) { }, 5*time.Second, 1*time.Second).Should(gomega.HaveOccurred()) } +func TestWebhookPoll(t *testing.T) { + g := gomega.NewGomegaWithT(t) + stop, finish, taskStartTime, taskFinishTime := RunPollingServer() + defer func() { + stop <- struct{}{} + <-finish + }() + stage := PreTrafficOffStage + policy := appsv1alpha1.Fail + interval := int64(3) + timeout := int64(60) + rs := &appsv1alpha1.PodTransitionRule{ + ObjectMeta: metav1.ObjectMeta{ + Name: "podtransitionrule-poll-test", + Namespace: "default", + }, + Spec: appsv1alpha1.PodTransitionRuleSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "test": "gen", + }, + }, + Rules: []appsv1alpha1.TransitionRule{ + { + Stage: &stage, + Name: "webhook", + TransitionRuleDefinition: appsv1alpha1.TransitionRuleDefinition{ + Webhook: &appsv1alpha1.TransitionRuleWebhook{ + ClientConfig: appsv1alpha1.ClientConfigBeta1{ + URL: "http://127.0.0.1:8999/first", + Poll: &appsv1alpha1.Poll{ + URL: "http://127.0.0.1:8999/poll", + IntervalSeconds: &interval, + TimeoutSeconds: &timeout, + }, + }, + FailurePolicy: &policy, + }, + }, + }, + }, + }, + } + var pods []*corev1.Pod + pods = append(pods, + genDefaultPod("default", "pod-test-1"), + genDefaultPod("default", "pod-test-2"), + genDefaultPod("default", "pod-test-3"), + genDefaultPod("default", "pod-test-4")) + pods[0].Labels[StageLabel] = PreTrafficOffStage + pods[1].Labels[StageLabel] = PreTrafficOffStage + pods[2].Labels[StageLabel] = PreTrafficOffStage + pods[3].Labels[StageLabel] = PreTrafficOffStage + for _, po := range pods { + g.Expect(c.Create(ctx, po)).NotTo(gomega.HaveOccurred()) + } + + podList := &corev1.PodList{} + defer func() { + c.List(ctx, podList, client.InNamespace("default")) + for _, po := range podList.Items { + g.Expect(c.Delete(ctx, &po)).NotTo(gomega.HaveOccurred()) + } + }() + g.Eventually(func() int { + g.Expect(c.List(ctx, podList, &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(rs.Spec.Selector.MatchLabels), + })).ShouldNot(gomega.HaveOccurred()) + return len(podList.Items) + }, 5*time.Second, 1*time.Second).Should(gomega.Equal(4)) + g.Expect(c.Create(ctx, rs)).NotTo(gomega.HaveOccurred()) + g.Eventually(func() bool { + if err := c.Get(ctx, types.NamespacedName{ + Name: rs.Name, + Namespace: rs.Namespace, + }, rs); err == nil { + return len(rs.Status.Details) == 4 + } + return false + }, 5*time.Second, 500*time.Millisecond).Should(gomega.BeTrue()) + passCaseMap := map[int]bool{} + g.Eventually(func() int { + g.Expect(c.List(ctx, podList, &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(rs.Spec.Selector.MatchLabels), + })).NotTo(gomega.HaveOccurred()) + // LifeCycle 2. 2 pod passed + passedCount := 0 + for i := range podList.Items { + state, err := PodTransitionRuleManager().GetState(ctx, c, &podList.Items[i]) + g.Expect(err).NotTo(gomega.HaveOccurred()) + if state.InStageAndPassed() && state.Stage == PreTrafficOffStage { + passedCount++ + } + } + passCaseMap[passedCount] = true + return passedCount + }, 15*time.Second, 1*time.Second).Should(gomega.Equal(4)) + g.Expect(passCaseMap[0]).Should(gomega.BeTrue()) + g.Expect(passCaseMap[1]).Should(gomega.BeTrue()) + g.Expect(passCaseMap[2]).Should(gomega.BeTrue()) + g.Expect(passCaseMap[3]).Should(gomega.BeTrue()) + g.Expect(passCaseMap[4]).Should(gomega.BeTrue()) + for taskId, tm := range taskStartTime { + cost := taskFinishTime[taskId].Sub(tm) + fmt.Printf("task %s, cost %d seconds\n", taskId, cost/time.Second) + g.Expect(cost >= 12*time.Second).Should(gomega.BeTrue()) + } +} + const ( StageLabel = "test.kafe.io/stage" ConditionLabel = "test.kafe.io/condition" @@ -445,3 +555,91 @@ func RunHttpServer(f func(http.ResponseWriter, *http.Request), port string) (cha finish := server.Run(stopped) return stopped, finish } + +func RunPollingServer() (chan<- struct{}, <-chan struct{}, map[string]time.Time, map[string]time.Time) { + taskMap := map[string][]string{} + taskIdx := map[string]int{} + taskStartTime := map[string]time.Time{} + taskFinishTime := map[string]time.Time{} + serverMux := http.NewServeMux() + serverMux.HandleFunc("/first", func(resp http.ResponseWriter, req *http.Request) { + fmt.Println(req.URL) + all, err := io.ReadAll(req.Body) + if err != nil { + fmt.Println(fmt.Sprintf("read body err: %s", err)) + } + webReq := &appsv1alpha1.WebhookRequest{} + fmt.Printf("handle http req. time %s: %s\n", time.Now().String(), string(all)) + err = json.Unmarshal(all, webReq) + if err != nil { + fmt.Printf("fail to unmarshal webhook request: %v", err) + http.Error(resp, fmt.Sprintf("fail to unmarshal webhook request %s", string(all)), http.StatusInternalServerError) + return + } + var names []string + for _, resource := range webReq.Resources { + names = append(names, resource.Name) + } + taskID := uuid.New().String() + fmt.Printf("handle pods %v\n, task %s", names, taskID) + taskMap[taskID] = names + taskStartTime[taskID] = time.Now() + webhookResp := &appsv1alpha1.WebhookResponse{ + Success: true, + Message: fmt.Sprintf("init task %s", taskID), + Poll: true, + TaskId: taskID, + } + byt, _ := json.Marshal(webhookResp) + resp.Write(byt) + }) + serverMux.HandleFunc("/poll", func(resp http.ResponseWriter, req *http.Request) { + fmt.Println(req.URL) + taskId := req.URL.Query().Get("task-id") + + pods, ok := taskMap[taskId] + if !ok { + panic(fmt.Sprintf("taskId %s not found", taskId)) + } + idx := taskIdx[taskId] + webhookResp := &appsv1alpha1.PollResponse{} + if idx == len(pods)-1 { + taskFinishTime[taskId] = time.Now() + webhookResp = &appsv1alpha1.PollResponse{ + Success: true, + Message: "success", + Finished: true, + } + } else { + taskIdx[taskId] = idx + 1 + webhookResp = &appsv1alpha1.PollResponse{ + Success: true, + Message: fmt.Sprintf("test passed pods %v", pods[:idx+1]), + Finished: false, + FinishedNames: pods[:idx+1], + } + } + byt, _ := json.Marshal(webhookResp) + resp.Write(byt) + }) + + sv := &http.Server{Addr: ":8999", Handler: serverMux} + stop := make(chan struct{}) + finish := make(chan struct{}) + go func() { + if err := sv.ListenAndServe(); err != nil { + fmt.Println(err) + } + finish <- struct{}{} + }() + <-time.After(5 * time.Second) + + fmt.Println("run server") + go func() { + select { + case <-stop: + sv.Close() + } + }() + return stop, finish, taskStartTime, taskFinishTime +} diff --git a/pkg/controllers/podtransitionrule/processor/rules/available.go b/pkg/controllers/podtransitionrule/processor/rules/available.go index b3d608b4..4c941976 100644 --- a/pkg/controllers/podtransitionrule/processor/rules/available.go +++ b/pkg/controllers/podtransitionrule/processor/rules/available.go @@ -77,7 +77,7 @@ func (r *AvailableRuler) Filter(podTransitionRule *appsv1alpha1.PodTransitionRul // filter unavailable pods for podName := range effectiveTargets { pod := targets[podName] - if utils.IsPodPassRule(pod, podTransitionRule, r.Name) { + if utils.IsPodPassRule(pod.Name, podTransitionRule, r.Name) { allowUnavailable-- continue } @@ -94,7 +94,7 @@ func (r *AvailableRuler) Filter(podTransitionRule *appsv1alpha1.PodTransitionRul // try approve available pod for podName := range subjects { pod := targets[podName] - if utils.IsPodPassRule(pod, podTransitionRule, r.Name) { + if utils.IsPodPassRule(pod.Name, podTransitionRule, r.Name) { pass.Insert(pod.Name) continue } diff --git a/pkg/controllers/podtransitionrule/processor/rules/polling.go b/pkg/controllers/podtransitionrule/processor/rules/polling.go new file mode 100644 index 00000000..e9a1dd0e --- /dev/null +++ b/pkg/controllers/podtransitionrule/processor/rules/polling.go @@ -0,0 +1,277 @@ +/* +Copyright 2023 The KusionStack Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rules + +import ( + "context" + "fmt" + "net/http" + "strings" + "sync" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/event" + + appsv1alpha1 "kusionstack.io/operating/apis/apps/v1alpha1" + utilshttp "kusionstack.io/operating/pkg/utils/http" +) + +const ( + MaxConcurrentTasks = 16 + TaskDeadLineSeconds = 60 +) + +var ( + PollingManager = newPollingManager(context.TODO()) +) + +type PollingManagerInterface interface { + Delete(id string) + Add(id, url, caBundle, resourceKey string, timeout, interval time.Duration) + GetResult(id string) *PollResult + Start(ctx context.Context) + AddListener(chan<- event.GenericEvent) +} + +func newPollingManager(ctx context.Context) PollingManagerInterface { + p := &pollingRunner{ + q: workqueue.New(), + tasks: make(map[string]*task), + ch: make(chan struct{}, MaxConcurrentTasks), + toDelete: make(chan string, 5), + } + go p.Start(ctx) + return p +} + +type pollingRunner struct { + mu sync.RWMutex + tasks map[string]*task + q workqueue.Interface + toDelete chan string + ch chan struct{} + + listeners []chan<- event.GenericEvent +} + +func (r *pollingRunner) AddListener(ch chan<- event.GenericEvent) { + r.mu.Lock() + defer r.mu.Unlock() + r.listeners = append(r.listeners, ch) +} + +func (r *pollingRunner) Start(ctx context.Context) { + stop := make(chan struct{}) + go r.worker(stop) + <-ctx.Done() + r.q.ShutDown() + close(stop) +} + +func (r *pollingRunner) worker(stop <-chan struct{}) { + go func() { + for { + select { + case id := <-r.toDelete: + r.Delete(id) + case <-stop: + return + } + } + }() + for { + id, shutdown := r.q.Get() + klog.Infof("test: get id %s, time %s", id, time.Now()) + if shutdown { + return + } + select { + case <-stop: + return + default: + } + r.acquire() + taskId := id.(string) + go r.doTask(taskId) + } +} + +func (r *pollingRunner) doTask(id string) { + defer r.free() + r.mu.RLock() + t, ok := r.tasks[id] + r.mu.RUnlock() + if !ok || t == nil { + r.q.Done(id) + return + } + if time.Now().After(t.deadlineTime) { + r.toDelete <- id + r.q.Done(id) + return + } + finish := t.do() + r.broadcast(t.resourceKey) + r.q.Done(id) + if !finish { + r.addAfter(id, t.interval) + } +} + +func (r *pollingRunner) broadcast(key string) { + r.mu.RLock() + defer r.mu.RUnlock() + val := strings.Split(key, "/") + for _, l := range r.listeners { + l <- event.GenericEvent{Object: &metav1.PartialObjectMetadata{ObjectMeta: metav1.ObjectMeta{Namespace: val[0], Name: val[1]}}} + } +} + +func (r *pollingRunner) addAfter(id string, d time.Duration) { + time.AfterFunc(d, func() { + r.q.Add(id) + }) +} + +func (r *pollingRunner) Add(id, url, caBundle, resourceKey string, timeout, interval time.Duration) { + r.mu.Lock() + defer r.mu.Unlock() + tm := time.Now() + t := &task{ + id: id, + url: url, + caBundle: caBundle, + resourceKey: resourceKey, + timeoutTime: tm.Add(timeout), + deadlineTime: tm.Add(timeout + (TaskDeadLineSeconds-1)*time.Second), + interval: interval, + result: &PollResult{ + Approved: sets.NewString(), + LastMessage: "Waiting for first query...", + }, + } + r.tasks[id] = t + r.addAfter(id, interval) + r.addAfter(id, timeout+TaskDeadLineSeconds*time.Second) +} + +func (r *pollingRunner) GetResult(id string) *PollResult { + r.mu.RLock() + defer r.mu.RUnlock() + return r.tasks[id].getResult() +} + +func (r *pollingRunner) Delete(id string) { + r.mu.Lock() + defer r.mu.Unlock() + delete(r.tasks, id) +} + +func (r *pollingRunner) acquire() { + r.ch <- struct{}{} +} + +func (r *pollingRunner) free() { + <-r.ch +} + +type task struct { + id string + url string + caBundle string + resourceKey string + + timeoutTime time.Time + deadlineTime time.Time + interval time.Duration + + latestQueryTime time.Time + + result *PollResult + + mu sync.RWMutex +} + +func (t *task) do() bool { + t.mu.Lock() + defer t.mu.Unlock() + if t.result.Stopped { + return false + } + if time.Now().After(t.timeoutTime) { + t.result.Stopped = true + t.result.LastMessage = fmt.Sprintf("Polling Timeout. %s", t.result.LastMessage) + return false + } + t.latestQueryTime = time.Now() + res, err := t.query() + t.result.Count++ + t.result.LastError = err + t.result.LastQueryTime = t.latestQueryTime + if err != nil { + return false + } + t.result.Approved.Insert(res.FinishedNames...) + t.result.Info = t.info() + t.result.LastMessage = res.Message + if res.Success && res.Finished { + t.result.ApproveAll = true + t.result.Stopped = true + } + if res.Stop { + t.result.Stopped = true + } + return t.result.Stopped +} + +func (t *task) query() (*appsv1alpha1.PollResponse, error) { + httpResp, err := utilshttp.DoHttpAndHttpsRequestWithCa(http.MethodGet, t.url, nil, nil, t.caBundle) + if err != nil { + return nil, err + } + resp := &appsv1alpha1.PollResponse{} + if err = utilshttp.ParseResponse(httpResp, resp); err != nil { + return nil, err + } + + return resp, nil +} + +func (t *task) info() string { + return fmt.Sprintf("task-id=%s, url=%s, latestQueryTime=%s.", t.id, t.url, t.latestQueryTime.String()) +} + +func (t *task) getResult() *PollResult { + t.mu.Lock() + defer t.mu.Unlock() + return t.result +} + +type PollResult struct { + Count int + Stopped bool + ApproveAll bool + Approved sets.String + LastMessage string + Info string + LastError error + LastQueryTime time.Time +} diff --git a/pkg/controllers/podtransitionrule/processor/rules/types.go b/pkg/controllers/podtransitionrule/processor/rules/types.go index f28288eb..1f3606aa 100644 --- a/pkg/controllers/podtransitionrule/processor/rules/types.go +++ b/pkg/controllers/podtransitionrule/processor/rules/types.go @@ -117,7 +117,7 @@ func (w *Webhook) buildRequest(pods sets.String) (*appsv1alpha1.WebhookRequest, for _, parameter := range w.Webhook.Parameters { value, err := w.parseParameter(¶meter, w.targets[podName]) if err != nil { - return nil, fmt.Errorf("%s failed to parse parameter, %v", w.key(), err) + return nil, fmt.Errorf("%s failed to parse parameter, %v", w.Key, err) } parameters[parameter.Key] = value } diff --git a/pkg/controllers/podtransitionrule/processor/rules/webhook.go b/pkg/controllers/podtransitionrule/processor/rules/webhook.go index 7f69781a..3ff99acf 100644 --- a/pkg/controllers/podtransitionrule/processor/rules/webhook.go +++ b/pkg/controllers/podtransitionrule/processor/rules/webhook.go @@ -27,6 +27,7 @@ import ( "k8s.io/klog" appsv1alpha1 "kusionstack.io/operating/apis/apps/v1alpha1" + controllerutils "kusionstack.io/operating/pkg/controllers/podtransitionrule/utils" "kusionstack.io/operating/pkg/utils" utilshttp "kusionstack.io/operating/pkg/utils/http" ) @@ -44,13 +45,12 @@ func (r *WebhookRuler) Filter( } const ( - defaultTimeout = 60 * time.Second defaultInterval = 5 * time.Second ) -func GetWebhook(rs *appsv1alpha1.PodTransitionRule, names ...string) (webs []*Webhook) { +func GetWebhook(pt *appsv1alpha1.PodTransitionRule, names ...string) (webs []*Webhook) { ruleNames := sets.NewString(names...) - for i, rule := range rs.Spec.Rules { + for i, rule := range pt.Spec.Rules { var web *appsv1alpha1.TransitionRuleWebhook if rule.Webhook == nil { continue @@ -58,13 +58,13 @@ func GetWebhook(rs *appsv1alpha1.PodTransitionRule, names ...string) (webs []*We if ruleNames.Len() > 0 && !ruleNames.Has(rule.Name) { continue } - web = rs.Spec.Rules[i].Webhook + web = pt.Spec.Rules[i].Webhook ruleState := &appsv1alpha1.RuleState{ Name: rule.Name, } - for j, state := range rs.Status.RuleStates { + for j, state := range pt.Status.RuleStates { if state.Name == rule.Name { - ruleState = rs.Status.RuleStates[j].DeepCopy() + ruleState = pt.Status.RuleStates[j].DeepCopy() break } } @@ -81,9 +81,12 @@ func GetWebhook(rs *appsv1alpha1.PodTransitionRule, names ...string) (webs []*We webs = append(webs, &Webhook{ Stage: rule.Stage, RuleName: rule.Name, - Key: rs.Namespace + "/" + rs.Name + "/" + rule.Name, + Key: pt.Namespace + "/" + pt.Name + "/" + rule.Name, Webhook: web, State: ruleState, + Approved: func(po string) bool { + return controllerutils.IsPodPassRule(po, pt, rule.Name) + }, }) } return webs @@ -97,6 +100,8 @@ type Webhook struct { Webhook *appsv1alpha1.TransitionRuleWebhook State *appsv1alpha1.RuleState + Approved func(string) bool + targets map[string]*corev1.Pod subjects sets.String @@ -122,13 +127,13 @@ func (w *Webhook) Do(targets map[string]*corev1.Pod, subjects sets.String) *Filt w.setItems(targets, subjects) w.taskInfo = map[string]*appsv1alpha1.TaskInfo{} effectiveSubjects := sets.NewString(w.subjects.List()...) - passed := sets.NewString() + checked := sets.NewString() rejectedPods := map[string]string{} for sub := range w.subjects { - if alreadyApproved(targets[sub]) { + if w.Approved(targets[sub].Name) { effectiveSubjects.Delete(sub) - passed.Insert(sub) + checked.Insert(sub) } } @@ -141,7 +146,6 @@ func (w *Webhook) Do(targets map[string]*corev1.Pod, subjects sets.String) *Filt w.State.WebhookStatus = newWebhookState }() - checked := sets.NewString() taskPods := map[string]sets.String{} allTracingPods := sets.NewString() processingTask := sets.NewString() @@ -174,10 +178,28 @@ func (w *Webhook) Do(targets map[string]*corev1.Pod, subjects sets.String) *Filt continue } pods := taskPods[taskId] - - // poll task timeout - if w.timeOut(taskId) { + pollingResult := PollingManager.GetResult(taskId) + w.recordTask(taskId, pollingResult.LastMessage, pollingResult.LastQueryTime) + if pollingResult.ApproveAll { + klog.Infof("polling task finished, approve all pods after %d times, %s, %s", pollingResult.Count, pollingResult.Info, pollingResult.LastMessage) + newWebhookState.ItemStatus = appendStatus(newWebhookState.ItemStatus, pods, func(po string) bool { + checked.Insert(po) + return true + }, taskId) + PollingManager.Delete(taskId) + continue + } + var errMsg string + if pollingResult.LastError != nil { + errMsg = fmt.Sprintf("polling task %s error, %v", taskId, pollingResult.LastError) + klog.Warningf(errMsg) + } + if pollingResult.Stopped { + klog.Infof("polling task stopped after %d times, approved pods %v, %s, %s", pollingResult.Count, pollingResult.Approved.List(), pollingResult.Info, pollingResult.LastMessage) for po := range pods { + if pollingResult.Approved.Has(po) { + checked.Insert(po) + } if checked.Has(po) { newWebhookState.ItemStatus = append(newWebhookState.ItemStatus, &appsv1alpha1.ItemStatus{ Name: po, @@ -186,110 +208,37 @@ func (w *Webhook) Do(targets map[string]*corev1.Pod, subjects sets.String) *Filt }) } else { allTracingPods.Delete(po) - rejectedPods[po] = fmt.Sprintf("webhook check [%s] timeout, trace %s", w.key(), taskId) - } - } - continue - } - - if ok, wait, cost := w.outInterval(taskId); !ok { - var lastMsg string - info := w.getTaskInfo(taskId) - if info != nil { - lastMsg = info.Message - } - newWebhookState.ItemStatus = appendStatus(newWebhookState.ItemStatus, pods, func(po string) bool { - hasChecked := checked.Has(po) - if !hasChecked { rejectedPods[po] = fmt.Sprintf( - "webhook check [%s], traceId %s is waiting for next interval, msg: %s ,cost time %s", - w.key(), + "Not approved by webhook %s, polling task %s stoped %s %s", + w.Key, taskId, - lastMsg, - cost.String(), + pollingResult.LastMessage, + errMsg, ) } - return hasChecked - }, taskId) - w.recordTimeOld(taskId) - w.updateInterval(wait) - continue - } - - res, err := w.polling(taskId) - - if err != nil { - w.recordTime(taskId, err.Error(), false) - newWebhookState.ItemStatus = appendStatus(newWebhookState.ItemStatus, pods, func(po string) bool { - hasChecked := checked.Has(po) - if !hasChecked { - rejectedPods[po] = fmt.Sprintf("webhook check [%s] error, taskId %s, err: %v", w.key(), taskId, err) - } - return hasChecked - }, taskId) - continue - } - w.recordTime(taskId, res.Message, false) - - // all failed - if !res.Success { - newWebhookState.ItemStatus = appendStatus(newWebhookState.ItemStatus, pods, func(po string) bool { - hasChecked := checked.Has(po) - if !hasChecked { - rejectedPods[po] = fmt.Sprintf("webhook check [%s] failed, taskId: %s, err: %v", w.key(), taskId, err) - } - return hasChecked - }, taskId) + } + PollingManager.Delete(taskId) continue } - // all passed - if res.Finished { - newWebhookState.ItemStatus = appendStatus(newWebhookState.ItemStatus, pods, func(po string) bool { - checked.Insert(po) - return true - }, taskId) - continue - } - if w.Webhook.ClientConfig.Poll.IntervalSeconds != nil { - w.updateInterval(time.Duration(*w.Webhook.ClientConfig.Poll.IntervalSeconds) * time.Second) - } - localFinished := sets.NewString(res.FinishedNames...) - // query trace - if !res.Stop { - newWebhookState.ItemStatus = appendStatus(newWebhookState.ItemStatus, pods, func(po string) bool { - if localFinished.Has(po) { - checked.Insert(po) - } - hasChecked := checked.Has(po) - if !hasChecked { - rejectedPods[po] = fmt.Sprintf( - "webhook check [%s] rejected, will retry by taskId %s, msg: %s", - w.key(), - taskId, - res.Message, - ) - } - return hasChecked - }, taskId) - continue - } - // stop trace + // always polling + klog.Infof("polling task is running, current %d times, approved pods %v, %s, %s", pollingResult.Count, pollingResult.Approved.List(), pollingResult.Info, pollingResult.LastMessage) newWebhookState.ItemStatus = appendStatus(newWebhookState.ItemStatus, pods, func(po string) bool { - if localFinished.Has(po) { + if pollingResult.Approved.Has(po) { checked.Insert(po) } hasChecked := checked.Has(po) if !hasChecked { rejectedPods[po] = fmt.Sprintf( - "webhook check [%s] rejected by stop task, taskId %s, msg: %v", - w.key(), + "Not approved by webhook %s, polling task %s %s %s", + w.Key, taskId, - res.Message, + pollingResult.LastMessage, + errMsg, ) } return hasChecked - }, "") + }, taskId) } effectiveSubjects.Delete(allTracingPods.List()...) @@ -309,15 +258,15 @@ func (w *Webhook) Do(targets map[string]*corev1.Pod, subjects sets.String) *Filt if err != nil { for eft := range effectiveSubjects { rejectedPods[eft] = fmt.Sprintf( - "fail to request webhook [%s], %v, traceId %s", - w.key(), + "fail to do webhook request %s, %v, traceId %s", + w.Key, err, selfTraceId, ) } klog.Errorf( - "fail to request podtransitionrule webhook [%s], pods: %v, traceId: %s, resp: %s", - w.key(), + "fail to request podtransitionrule webhook %s, pods: %v, traceId: %s, resp: %s", + w.Key, effectiveSubjects.List(), selfTraceId, utils.DumpJSON(res), @@ -331,15 +280,15 @@ func (w *Webhook) Do(targets map[string]*corev1.Pod, subjects sets.String) *Filt } taskId := getTaskId(res) klog.Infof( - "request podtransitionrule webhook [%s], pods: %v, taskId: %s, traceId: %s, resp: %s", - w.key(), + "request podtransitionrule webhook %s, pods: %v, taskId: %s, traceId: %s, resp: %s", + w.Key, effectiveSubjects.List(), taskId, selfTraceId, utils.DumpJSON(res), ) if taskId != "" { - w.recordTime(taskId, res.Message, true) + w.recordTask(taskId, res.Message, time.Time{}) } localFinished := sets.NewString(res.FinishedNames...) @@ -351,8 +300,8 @@ func (w *Webhook) Do(targets map[string]*corev1.Pod, subjects sets.String) *Filt hasChecked := checked.Has(po) if !hasChecked { rejectedPods[po] = fmt.Sprintf( - "webhook check [%s] rejected, traceId %s, taskId %s, msg: %s", - w.key(), + "webhook check %s rejected, traceId %s, taskId %s, msg: %s", + w.Key, selfTraceId, taskId, res.Message, @@ -360,6 +309,7 @@ func (w *Webhook) Do(targets map[string]*corev1.Pod, subjects sets.String) *Filt } return hasChecked }, "") + w.updateInterval(defaultInterval) } else if !shouldPoll(res) { // success, All passed newWebhookState.ItemStatus = appendStatus(newWebhookState.ItemStatus, effectiveSubjects, func(po string) bool { @@ -368,6 +318,28 @@ func (w *Webhook) Do(targets map[string]*corev1.Pod, subjects sets.String) *Filt }, taskId) } else { // success, poll + pollUrl, err := w.getPollingUrl(taskId) + if err != nil { + for eft := range effectiveSubjects { + rejectedPods[eft] = fmt.Sprintf("fail to get %s polling config , %v", w.Key, err) + } + return &FilterResult{ + Passed: checked, + Rejected: rejectedPods, + Err: err, + RuleState: &appsv1alpha1.RuleState{Name: w.RuleName, WebhookStatus: newWebhookState}, + } + } + // add to polling manager + PollingManager.Add( + taskId, + pollUrl, + w.Webhook.ClientConfig.Poll.CABundle, + w.Key, + time.Duration(*w.Webhook.ClientConfig.Poll.TimeoutSeconds)*time.Second, + time.Duration(*w.Webhook.ClientConfig.Poll.IntervalSeconds)*time.Second, + ) + klog.Infof("%s, polling task %s initialized.", w.Key, taskId) newWebhookState.ItemStatus = appendStatus(newWebhookState.ItemStatus, effectiveSubjects, func(po string) bool { if localFinished.Has(po) { checked.Insert(po) @@ -375,8 +347,8 @@ func (w *Webhook) Do(targets map[string]*corev1.Pod, subjects sets.String) *Filt hasChecked := checked.Has(po) if !hasChecked { rejectedPods[po] = fmt.Sprintf( - "webhook check [%s] rejected, will retry by taskId %s, msg: %s", - w.key(), + "polling task %s initialized, will polling by taskId %s, msg: %s", + w.Key, taskId, res.Message, ) @@ -409,46 +381,24 @@ func (w *Webhook) convTaskInfo(infoMap map[string]*appsv1alpha1.TaskInfo) []apps return states } -func (w *Webhook) recordTime(taskId, msg string, isFirst bool) { - timeNow := time.Now() - newRecord := &appsv1alpha1.TaskInfo{ - TaskId: taskId, - BeginTime: &metav1.Time{Time: timeNow}, - LastTime: &metav1.Time{Time: timeNow}, - Message: msg, - } - if !isFirst { - tm := w.getTaskInfo(taskId) - if tm != nil && tm.BeginTime != nil { - newRecord.BeginTime = tm.BeginTime.DeepCopy() - } - } - w.taskInfo[taskId] = newRecord -} - -func (w *Webhook) recordTimeOld(taskId string) { +func (w *Webhook) recordTask(taskId string, msg string, updateTime time.Time) { tm := w.getTaskInfo(taskId) - if tm != nil { - w.taskInfo[taskId] = &appsv1alpha1.TaskInfo{ - BeginTime: tm.BeginTime.DeepCopy(), - LastTime: tm.LastTime.DeepCopy(), - Message: tm.Message, - TaskId: taskId, - } + // first record + var beginTime, lastTime *metav1.Time + if tm == nil { + beginTime = &metav1.Time{Time: time.Now()} + } else { + beginTime = tm.BeginTime.DeepCopy() } -} - -func (w *Webhook) timeOut(taskId string) bool { - timeNow := time.Now() - timeOut := defaultTimeout - tm := w.getTaskInfo(taskId) - if tm == nil || tm.BeginTime == nil { - return false + if updateTime.After(beginTime.Time) { + lastTime = &metav1.Time{Time: updateTime} } - if w.Webhook.ClientConfig.Poll.TimeoutSeconds != nil { - timeOut = time.Duration(*w.Webhook.ClientConfig.Poll.TimeoutSeconds) * time.Second + w.taskInfo[taskId] = &appsv1alpha1.TaskInfo{ + BeginTime: beginTime, + LastTime: lastTime, + Message: msg, + TaskId: taskId, } - return timeNow.Sub(tm.BeginTime.Time) > timeOut } func (w *Webhook) getTaskInfo(traceId string) *appsv1alpha1.TaskInfo { @@ -463,35 +413,15 @@ func (w *Webhook) getTaskInfo(traceId string) *appsv1alpha1.TaskInfo { return nil } -func (w *Webhook) outInterval(traceId string) (bool, time.Duration, time.Duration) { - tm := w.getTaskInfo(traceId) - if tm == nil || tm.LastTime == nil { - return true, 0, 0 - } - interval := defaultInterval - if w.Webhook.ClientConfig.Poll.IntervalSeconds != nil { - interval = time.Duration(*w.Webhook.ClientConfig.Poll.IntervalSeconds) * time.Second +func (w *Webhook) getPollingUrl(taskId string) (string, error) { + if w.Webhook.ClientConfig.Poll == nil { + return "", fmt.Errorf("null polling config in rule %s", w.Key) } - allCost := time.Since(tm.BeginTime.Time) - nowInterval := time.Since(tm.LastTime.Time) - wait := interval - nowInterval - return nowInterval > interval, wait, allCost -} - -func (w *Webhook) polling(taskId string) (*appsv1alpha1.PollResponse, error) { pollUrl := fmt.Sprintf("%s?task-id=%s", w.Webhook.ClientConfig.Poll.URL, taskId) if w.Webhook.ClientConfig.Poll.RawQueryKey != "" { pollUrl = fmt.Sprintf("%s?%s=%s", w.Webhook.ClientConfig.Poll.URL, w.Webhook.ClientConfig.Poll.RawQueryKey, taskId) } - httpResp, err := utilshttp.DoHttpAndHttpsRequestWithCa(http.MethodGet, pollUrl, nil, nil, w.Webhook.ClientConfig.Poll.CABundle) - if err != nil { - return nil, err - } - resp := &appsv1alpha1.PollResponse{} - if err = utilshttp.ParseResponse(httpResp, resp); err != nil { - return nil, err - } - return resp, nil + return pollUrl, nil } func (w *Webhook) query(podSet sets.String) (string, *appsv1alpha1.WebhookResponse, error) { @@ -515,10 +445,6 @@ func (w *Webhook) doHttp(req *appsv1alpha1.WebhookRequest) (*appsv1alpha1.Webhoo return resp, nil } -func (w *Webhook) key() string { - return w.Key -} - func appendStatus(current []*appsv1alpha1.ItemStatus, nameSet sets.String, checkFunc func(string) bool, taskId string) []*appsv1alpha1.ItemStatus { for name := range nameSet { current = append(current, &appsv1alpha1.ItemStatus{ @@ -543,7 +469,3 @@ func getTaskId(resp *appsv1alpha1.WebhookResponse) string { } return "" } - -func alreadyApproved(po *corev1.Pod) bool { - return false -} diff --git a/pkg/controllers/podtransitionrule/processor/rules/webhook_test.go b/pkg/controllers/podtransitionrule/processor/rules/webhook_test.go index 496c5ecc..f03bae79 100644 --- a/pkg/controllers/podtransitionrule/processor/rules/webhook_test.go +++ b/pkg/controllers/podtransitionrule/processor/rules/webhook_test.go @@ -236,7 +236,7 @@ func TestWebhookPollFail(t *testing.T) { g.Expect(res.Passed.Len()).Should(gomega.BeEquivalentTo(0)) g.Expect(len(res.Rejected)).Should(gomega.BeEquivalentTo(3)) - <-time.After(5 * time.Second) + <-time.After(6 * time.Second) state = &appsv1alpha1.RuleState{Name: web.RuleName, WebhookStatus: res.RuleState.WebhookStatus} pollRS.Status.RuleStates = []*appsv1alpha1.RuleState{state} webhooks = GetWebhook(pollRS) @@ -301,7 +301,7 @@ func TestWebhookPoll(t *testing.T) { g.Expect(res.Passed.Len()).Should(gomega.BeEquivalentTo(0)) g.Expect(len(res.Rejected)).Should(gomega.BeEquivalentTo(3)) - <-time.After(5 * time.Second) + <-time.After(6 * time.Second) state = &appsv1alpha1.RuleState{Name: web.RuleName, WebhookStatus: res.RuleState.WebhookStatus} pollRS.Status.RuleStates = []*appsv1alpha1.RuleState{state} webhooks = GetWebhook(pollRS.DeepCopy()) diff --git a/pkg/controllers/podtransitionrule/utils/check.go b/pkg/controllers/podtransitionrule/utils/check.go index 74af4fce..46a7e549 100644 --- a/pkg/controllers/podtransitionrule/utils/check.go +++ b/pkg/controllers/podtransitionrule/utils/check.go @@ -17,24 +17,23 @@ package utils import ( - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" appsv1alpha1 "kusionstack.io/operating/apis/apps/v1alpha1" ) -func IsPodPassRule(po *corev1.Pod, podtransitionrule *appsv1alpha1.PodTransitionRule, rule string) bool { - passedRules := GetPodPassedRules(po, podtransitionrule) +func IsPodPassRule(podName string, podtransitionrule *appsv1alpha1.PodTransitionRule, rule string) bool { + passedRules := GetPodPassedRules(podName, podtransitionrule) return passedRules.Has(rule) } -func GetPodPassedRules(po *corev1.Pod, podtransitionrule *appsv1alpha1.PodTransitionRule) (rules sets.String) { +func GetPodPassedRules(podName string, podtransitionrule *appsv1alpha1.PodTransitionRule) (rules sets.String) { rules = sets.NewString() if podtransitionrule.Status.Details == nil { return rules } for _, detail := range podtransitionrule.Status.Details { - if detail.Name != po.Name { + if detail.Name != podName { continue } rules.Insert(detail.PassedRules...) From 0dc1076be2752d3c46b6df9d5cd9df82e399938a Mon Sep 17 00:00:00 2001 From: Eikykun Date: Mon, 25 Dec 2023 15:44:41 +0800 Subject: [PATCH 3/6] delete unused code --- .../podtransitionrule/podtransitionrule_controller.go | 3 --- pkg/controllers/podtransitionrule/processor/rules/polling.go | 2 -- 2 files changed, 5 deletions(-) diff --git a/pkg/controllers/podtransitionrule/podtransitionrule_controller.go b/pkg/controllers/podtransitionrule/podtransitionrule_controller.go index d417b990..505fb3ba 100644 --- a/pkg/controllers/podtransitionrule/podtransitionrule_controller.go +++ b/pkg/controllers/podtransitionrule/podtransitionrule_controller.go @@ -341,9 +341,6 @@ func equalStatus(updated *appsv1alpha1.PodTransitionRuleStatus, current *appsv1a equality.Semantic.DeepEqual(updated.RuleStates, current.RuleStates) && updated.ObservedGeneration == current.ObservedGeneration if !deepEqual { - fmt.Println("test: update pd status ") - fmt.Println(utils.DumpJSON(current)) - fmt.Println(utils.DumpJSON(updated)) return utils.DumpJSON(updated) == utils.DumpJSON(current) } return deepEqual diff --git a/pkg/controllers/podtransitionrule/processor/rules/polling.go b/pkg/controllers/podtransitionrule/processor/rules/polling.go index e9a1dd0e..e41f6a66 100644 --- a/pkg/controllers/podtransitionrule/processor/rules/polling.go +++ b/pkg/controllers/podtransitionrule/processor/rules/polling.go @@ -27,7 +27,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/util/workqueue" - "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/event" appsv1alpha1 "kusionstack.io/operating/apis/apps/v1alpha1" @@ -99,7 +98,6 @@ func (r *pollingRunner) worker(stop <-chan struct{}) { }() for { id, shutdown := r.q.Get() - klog.Infof("test: get id %s, time %s", id, time.Now()) if shutdown { return } From 86217859bb7be1d0988269a4f6a968ffff525bd1 Mon Sep 17 00:00:00 2001 From: Eikykun Date: Mon, 25 Dec 2023 20:52:46 +0800 Subject: [PATCH 4/6] enhance: delete PT webhook ItemStatus --- apis/apps/v1alpha1/podtransitionrule_types.go | 21 +- apis/apps/v1alpha1/zz_generated.deepcopy.go | 41 +-- ...pps.kusionstack.io_podtransitionrules.yaml | 40 ++- .../podtransitionrule_controller_test.go | 2 +- .../podtransitionrule/processor/processor.go | 7 +- .../processor/rules/types.go | 4 +- .../processor/rules/webhook.go | 329 ++++++++---------- .../processor/rules/webhook_test.go | 20 +- 8 files changed, 217 insertions(+), 247 deletions(-) diff --git a/apis/apps/v1alpha1/podtransitionrule_types.go b/apis/apps/v1alpha1/podtransitionrule_types.go index 7f6f04bb..c9aa05bb 100644 --- a/apis/apps/v1alpha1/podtransitionrule_types.go +++ b/apis/apps/v1alpha1/podtransitionrule_types.go @@ -182,19 +182,20 @@ type RuleState struct { // WebhookStatus defines the webhook processing status type WebhookStatus struct { - // PodTransitionRulePodStatus is async request status representing the info of pods - ItemStatus []*ItemStatus `json:"itemStatus,omitempty"` // TaskStates is a list of tracing info TaskStates []TaskInfo `json:"taskStates,omitempty"` + + // History records history taskStates which were finished or failed. Valid for 10 minutes + History []TaskInfo `json:"history,omitempty"` } type TaskInfo struct { TaskId string `json:"taskId,omitempty"` - //Processing []string `json:"targets,omitempty"` + Processing []string `json:"processing,omitempty"` - //Approved []string `json:"approved,omitempty"` + Approved []string `json:"approved,omitempty"` BeginTime *metav1.Time `json:"beginTime,omitempty"` @@ -203,18 +204,6 @@ type TaskInfo struct { Message string `json:"message,omitempty"` } -// ItemStatus defines async request info of resources -type ItemStatus struct { - // Name representing the name of pod - Name string `json:"name,omitempty"` - - // WebhookChecked representing the pod has pass check - WebhookChecked bool `json:"webhookChecked"` - - // TraceId representing poll request taskId - TaskId string `json:"taskId,omitempty"` -} - type PodTransitionDetail struct { // Name representing Pod name Name string `json:"name,omitempty"` diff --git a/apis/apps/v1alpha1/zz_generated.deepcopy.go b/apis/apps/v1alpha1/zz_generated.deepcopy.go index 623a659b..0842e766 100644 --- a/apis/apps/v1alpha1/zz_generated.deepcopy.go +++ b/apis/apps/v1alpha1/zz_generated.deepcopy.go @@ -267,21 +267,6 @@ func (in *ContextDetail) DeepCopy() *ContextDetail { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *ItemStatus) DeepCopyInto(out *ItemStatus) { - *out = *in -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ItemStatus. -func (in *ItemStatus) DeepCopy() *ItemStatus { - if in == nil { - return nil - } - out := new(ItemStatus) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *LabelCheckRule) DeepCopyInto(out *LabelCheckRule) { *out = *in @@ -759,6 +744,16 @@ func (in *ScaleStrategy) DeepCopy() *ScaleStrategy { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TaskInfo) DeepCopyInto(out *TaskInfo) { *out = *in + if in.Processing != nil { + in, out := &in.Processing, &out.Processing + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.Approved != nil { + in, out := &in.Approved, &out.Approved + *out = make([]string, len(*in)) + copy(*out, *in) + } if in.BeginTime != nil { in, out := &in.BeginTime, &out.BeginTime *out = (*in).DeepCopy() @@ -963,19 +958,15 @@ func (in *WebhookResponse) DeepCopy() *WebhookResponse { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *WebhookStatus) DeepCopyInto(out *WebhookStatus) { *out = *in - if in.ItemStatus != nil { - in, out := &in.ItemStatus, &out.ItemStatus - *out = make([]*ItemStatus, len(*in)) + if in.TaskStates != nil { + in, out := &in.TaskStates, &out.TaskStates + *out = make([]TaskInfo, len(*in)) for i := range *in { - if (*in)[i] != nil { - in, out := &(*in)[i], &(*out)[i] - *out = new(ItemStatus) - **out = **in - } + (*in)[i].DeepCopyInto(&(*out)[i]) } } - if in.TaskStates != nil { - in, out := &in.TaskStates, &out.TaskStates + if in.History != nil { + in, out := &in.History, &out.History *out = make([]TaskInfo, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) diff --git a/config/crd/bases/apps.kusionstack.io_podtransitionrules.yaml b/config/crd/bases/apps.kusionstack.io_podtransitionrules.yaml index d3cc4495..23b3ffbb 100644 --- a/config/crd/bases/apps.kusionstack.io_podtransitionrules.yaml +++ b/config/crd/bases/apps.kusionstack.io_podtransitionrules.yaml @@ -374,31 +374,39 @@ spec: description: WebhookStatus is the webhook status representing processing progress properties: - itemStatus: - description: PodTransitionRulePodStatus is async request - status representing the info of pods + history: + description: History records history taskStates which were + finished or failed. Valid for 10 minutes items: - description: ItemStatus defines async request info of - resources properties: - name: - description: Name representing the name of pod + approved: + items: + type: string + type: array + beginTime: + format: date-time + type: string + lastTime: + format: date-time type: string + message: + type: string + processing: + items: + type: string + type: array taskId: - description: TraceId representing poll request taskId type: string - webhookChecked: - description: WebhookChecked representing the pod has - pass check - type: boolean - required: - - webhookChecked type: object type: array taskStates: description: TaskStates is a list of tracing info items: properties: + approved: + items: + type: string + type: array beginTime: format: date-time type: string @@ -407,6 +415,10 @@ spec: type: string message: type: string + processing: + items: + type: string + type: array taskId: type: string type: object diff --git a/pkg/controllers/podtransitionrule/podtransitionrule_controller_test.go b/pkg/controllers/podtransitionrule/podtransitionrule_controller_test.go index 21a2d56d..e3a86980 100644 --- a/pkg/controllers/podtransitionrule/podtransitionrule_controller_test.go +++ b/pkg/controllers/podtransitionrule/podtransitionrule_controller_test.go @@ -581,7 +581,7 @@ func RunPollingServer() (chan<- struct{}, <-chan struct{}, map[string]time.Time, names = append(names, resource.Name) } taskID := uuid.New().String() - fmt.Printf("handle pods %v\n, task %s", names, taskID) + fmt.Printf("handle pods %v\n, task %s\n", names, taskID) taskMap[taskID] = names taskStartTime[taskID] = time.Now() webhookResp := &appsv1alpha1.WebhookResponse{ diff --git a/pkg/controllers/podtransitionrule/processor/processor.go b/pkg/controllers/podtransitionrule/processor/processor.go index f155961d..a42903f9 100644 --- a/pkg/controllers/podtransitionrule/processor/processor.go +++ b/pkg/controllers/podtransitionrule/processor/processor.go @@ -161,9 +161,10 @@ func (p *Processor) Process(targets map[string]*corev1.Pod) *ProcessResult { } processingPods = result.Passed.Union(skipPods) - if processingPods.Len() == 0 { - break - } + // do not break: ensure update status + //if processingPods.Len() == 0 { + // break + //} } res := &ProcessResult{ diff --git a/pkg/controllers/podtransitionrule/processor/rules/types.go b/pkg/controllers/podtransitionrule/processor/rules/types.go index 1f3606aa..ea7b7d95 100644 --- a/pkg/controllers/podtransitionrule/processor/rules/types.go +++ b/pkg/controllers/podtransitionrule/processor/rules/types.go @@ -99,7 +99,7 @@ type HttpJob struct { Action string } -func (w *Webhook) buildRequest(pods sets.String) (*appsv1alpha1.WebhookRequest, error) { +func (w *Webhook) buildRequest(pods sets.String, targets map[string]*corev1.Pod) (*appsv1alpha1.WebhookRequest, error) { req := &appsv1alpha1.WebhookRequest{ RuleName: w.RuleName, @@ -115,7 +115,7 @@ func (w *Webhook) buildRequest(pods sets.String) (*appsv1alpha1.WebhookRequest, } parameters := map[string]string{} for _, parameter := range w.Webhook.Parameters { - value, err := w.parseParameter(¶meter, w.targets[podName]) + value, err := w.parseParameter(¶meter, targets[podName]) if err != nil { return nil, fmt.Errorf("%s failed to parse parameter, %v", w.Key, err) } diff --git a/pkg/controllers/podtransitionrule/processor/rules/webhook.go b/pkg/controllers/podtransitionrule/processor/rules/webhook.go index 3ff99acf..502851ea 100644 --- a/pkg/controllers/podtransitionrule/processor/rules/webhook.go +++ b/pkg/controllers/podtransitionrule/processor/rules/webhook.go @@ -71,12 +71,6 @@ func GetWebhook(pt *appsv1alpha1.PodTransitionRule, names ...string) (webs []*We if ruleState.WebhookStatus == nil { ruleState.WebhookStatus = &appsv1alpha1.WebhookStatus{} } - if ruleState.WebhookStatus.ItemStatus == nil { - ruleState.WebhookStatus.ItemStatus = []*appsv1alpha1.ItemStatus{} - } - if ruleState.WebhookStatus.TaskStates == nil { - ruleState.WebhookStatus.TaskStates = []appsv1alpha1.TaskInfo{} - } webs = append(webs, &Webhook{ Stage: rule.Stage, @@ -102,35 +96,17 @@ type Webhook struct { Approved func(string) bool - targets map[string]*corev1.Pod - subjects sets.String - retryInterval *time.Duration taskInfo map[string]*appsv1alpha1.TaskInfo } -func (w *Webhook) updateInterval(interval time.Duration) { - if interval >= 0 && w.retryInterval == nil || *w.retryInterval > interval { - w.retryInterval = &interval - } -} - -func (w *Webhook) setItems(targets map[string]*corev1.Pod, subjects sets.String) { - w.targets = map[string]*corev1.Pod{} - for k, v := range targets { - w.targets[k] = v - } - w.subjects = sets.NewString(subjects.List()...) -} - func (w *Webhook) Do(targets map[string]*corev1.Pod, subjects sets.String) *FilterResult { - w.setItems(targets, subjects) w.taskInfo = map[string]*appsv1alpha1.TaskInfo{} - effectiveSubjects := sets.NewString(w.subjects.List()...) + effectiveSubjects := sets.NewString(subjects.List()...) checked := sets.NewString() rejectedPods := map[string]string{} - - for sub := range w.subjects { + historyTaskInfo := map[string]*appsv1alpha1.TaskInfo{} + for sub := range subjects { if w.Approved(targets[sub].Name) { effectiveSubjects.Delete(sub) checked.Insert(sub) @@ -139,53 +115,44 @@ func (w *Webhook) Do(targets map[string]*corev1.Pod, subjects sets.String) *Filt newWebhookState := &appsv1alpha1.WebhookStatus{ TaskStates: []appsv1alpha1.TaskInfo{}, - ItemStatus: []*appsv1alpha1.ItemStatus{}, } defer func() { newWebhookState.TaskStates = w.convTaskInfo(w.taskInfo) + newWebhookState.History = w.convTaskInfo(historyTaskInfo) w.State.WebhookStatus = newWebhookState }() - - taskPods := map[string]sets.String{} allTracingPods := sets.NewString() - processingTask := sets.NewString() + nowTime := time.Now() + for i, state := range w.State.WebhookStatus.History { + if state.LastTime != nil && nowTime.Sub(state.LastTime.Time) < 10*time.Minute { + historyTaskInfo[state.TaskId] = w.State.WebhookStatus.History[i].DeepCopy() + } + } - for _, state := range w.State.WebhookStatus.ItemStatus { - if !effectiveSubjects.Has(state.Name) { + for i, state := range w.State.WebhookStatus.TaskStates { + if len(state.Processing) == 0 || !effectiveSubjects.HasAny(state.Processing...) { + // invalid, move in history + historyTaskInfo[state.TaskId] = &w.State.WebhookStatus.TaskStates[i] + PollingManager.Delete(state.TaskId) continue } - //processingPods.Insert(podState.Name) - if taskPods[state.TaskId] == nil { - taskPods[state.TaskId] = sets.NewString(state.Name) - } else { - taskPods[state.TaskId].Insert(state.Name) - } + currentPods := sets.NewString(Intersection(effectiveSubjects, state.Processing)...) + checked.Insert(Intersection(effectiveSubjects, state.Approved)...) + allTracingPods.Insert(currentPods.List()...) + taskId := state.TaskId - if state.TaskId != "" { - allTracingPods.Insert(state.Name) - } + // get latest polling result + pollingResult := PollingManager.GetResult(taskId) - if state.WebhookChecked { - checked.Insert(state.Name) - } else { - processingTask.Insert(state.TaskId) + if pollingResult == nil { + // TODO: After restart... + panic("null polling result") } - } - // process trace - for taskId := range processingTask { - if taskId == "" { - continue - } - pods := taskPods[taskId] - pollingResult := PollingManager.GetResult(taskId) - w.recordTask(taskId, pollingResult.LastMessage, pollingResult.LastQueryTime) if pollingResult.ApproveAll { klog.Infof("polling task finished, approve all pods after %d times, %s, %s", pollingResult.Count, pollingResult.Info, pollingResult.LastMessage) - newWebhookState.ItemStatus = appendStatus(newWebhookState.ItemStatus, pods, func(po string) bool { - checked.Insert(po) - return true - }, taskId) + w.recordTaskInfo(&state, pollingResult.LastMessage, pollingResult.LastQueryTime, state.Processing) + checked.Insert(currentPods.List()...) PollingManager.Delete(taskId) continue } @@ -194,51 +161,42 @@ func (w *Webhook) Do(targets map[string]*corev1.Pod, subjects sets.String) *Filt errMsg = fmt.Sprintf("polling task %s error, %v", taskId, pollingResult.LastError) klog.Warningf(errMsg) } + var rejectMsg string if pollingResult.Stopped { - klog.Infof("polling task stopped after %d times, approved pods %v, %s, %s", pollingResult.Count, pollingResult.Approved.List(), pollingResult.Info, pollingResult.LastMessage) - for po := range pods { - if pollingResult.Approved.Has(po) { - checked.Insert(po) - } - if checked.Has(po) { - newWebhookState.ItemStatus = append(newWebhookState.ItemStatus, &appsv1alpha1.ItemStatus{ - Name: po, - WebhookChecked: true, - TaskId: taskId, - }) - } else { - allTracingPods.Delete(po) - rejectedPods[po] = fmt.Sprintf( - "Not approved by webhook %s, polling task %s stoped %s %s", - w.Key, - taskId, - pollingResult.LastMessage, - errMsg, - ) - } - } + // stopped, move in history + newState := approve(state.DeepCopy(), pollingResult.Approved.List()) + newState.LastTime = &metav1.Time{Time: pollingResult.LastQueryTime} + historyTaskInfo[taskId] = newState PollingManager.Delete(taskId) - continue + klog.Infof("polling task stopped after %d times, approved pods %v, %s, %s", pollingResult.Count, pollingResult.Approved.List(), pollingResult.Info, pollingResult.LastMessage) + rejectMsg = fmt.Sprintf( + "Not approved by webhook %s, polling task %s stoped %s %s", + w.Key, + taskId, + pollingResult.LastMessage, + errMsg, + ) + } else { + w.recordTaskInfo(&state, pollingResult.LastMessage, pollingResult.LastQueryTime, pollingResult.Approved.List()) + klog.Infof("polling task is running, current %d times, approved pods %v, %s, %s", pollingResult.Count, pollingResult.Approved.List(), pollingResult.Info, pollingResult.LastMessage) + rejectMsg = fmt.Sprintf( + "Not approved by webhook %s, polling task %s is running %s %s", + w.Key, + taskId, + pollingResult.LastMessage, + errMsg, + ) } - - // always polling - klog.Infof("polling task is running, current %d times, approved pods %v, %s, %s", pollingResult.Count, pollingResult.Approved.List(), pollingResult.Info, pollingResult.LastMessage) - newWebhookState.ItemStatus = appendStatus(newWebhookState.ItemStatus, pods, func(po string) bool { + for po := range currentPods { if pollingResult.Approved.Has(po) { checked.Insert(po) + } else { + if pollingResult.Stopped { + allTracingPods.Delete(po) + } + rejectedPods[po] = rejectMsg } - hasChecked := checked.Has(po) - if !hasChecked { - rejectedPods[po] = fmt.Sprintf( - "Not approved by webhook %s, polling task %s %s %s", - w.Key, - taskId, - pollingResult.LastMessage, - errMsg, - ) - } - return hasChecked - }, taskId) + } } effectiveSubjects.Delete(allTracingPods.List()...) @@ -254,11 +212,11 @@ func (w *Webhook) Do(targets map[string]*corev1.Pod, subjects sets.String) *Filt } // First request - selfTraceId, res, err := w.query(effectiveSubjects) + selfTraceId, res, err := w.query(effectiveSubjects, targets) if err != nil { for eft := range effectiveSubjects { rejectedPods[eft] = fmt.Sprintf( - "fail to do webhook request %s, %v, traceId %s", + "Fail to do webhook request %s, %v, traceId %s", w.Key, err, selfTraceId, @@ -287,41 +245,48 @@ func (w *Webhook) Do(targets map[string]*corev1.Pod, subjects sets.String) *Filt selfTraceId, utils.DumpJSON(res), ) - if taskId != "" { - w.recordTask(taskId, res.Message, time.Time{}) - } - localFinished := sets.NewString(res.FinishedNames...) + localFinished := sets.NewString(res.FinishedNames...) + // Prevent result tampering + processing := effectiveSubjects.Difference(localFinished).List() + approved := Intersection(effectiveSubjects, res.FinishedNames) if !res.Success { - newWebhookState.ItemStatus = appendStatus(newWebhookState.ItemStatus, effectiveSubjects, func(po string) bool { - if localFinished.Has(po) { - checked.Insert(po) - } - hasChecked := checked.Has(po) - if !hasChecked { - rejectedPods[po] = fmt.Sprintf( - "webhook check %s rejected, traceId %s, taskId %s, msg: %s", - w.Key, - selfTraceId, - taskId, - res.Message, - ) - } - return hasChecked - }, "") + checked.Insert(approved...) + for _, po := range processing { + rejectedPods[po] = fmt.Sprintf( + "Webhook check %s rejected, traceId %s, taskId %s, msg: %s", + w.Key, + selfTraceId, + taskId, + res.Message, + ) + } + // requeue w.updateInterval(defaultInterval) } else if !shouldPoll(res) { // success, All passed - newWebhookState.ItemStatus = appendStatus(newWebhookState.ItemStatus, effectiveSubjects, func(po string) bool { - checked.Insert(po) - return true - }, taskId) + checked.Insert(effectiveSubjects.List()...) } else { - // success, poll + // success, init poll task + // trigger reconcile by PollingManager listener + if taskId == "" { + // TODO: invalid taskID + klog.Warningf("%s handle invalid webhook response, empty taskId in polling response", w.Key) + for eft := range effectiveSubjects { + rejectedPods[eft] = fmt.Sprintf("Invalid empty taskID, request trace %s", selfTraceId) + } + return &FilterResult{ + Passed: checked, + Rejected: rejectedPods, + Err: err, + RuleState: &appsv1alpha1.RuleState{Name: w.RuleName, WebhookStatus: newWebhookState}, + } + } + pollUrl, err := w.getPollingUrl(taskId) if err != nil { for eft := range effectiveSubjects { - rejectedPods[eft] = fmt.Sprintf("fail to get %s polling config , %v", w.Key, err) + rejectedPods[eft] = fmt.Sprintf("Fail to get %s polling config , %v", w.Key, err) } return &FilterResult{ Passed: checked, @@ -340,21 +305,16 @@ func (w *Webhook) Do(targets map[string]*corev1.Pod, subjects sets.String) *Filt time.Duration(*w.Webhook.ClientConfig.Poll.IntervalSeconds)*time.Second, ) klog.Infof("%s, polling task %s initialized.", w.Key, taskId) - newWebhookState.ItemStatus = appendStatus(newWebhookState.ItemStatus, effectiveSubjects, func(po string) bool { - if localFinished.Has(po) { - checked.Insert(po) - } - hasChecked := checked.Has(po) - if !hasChecked { - rejectedPods[po] = fmt.Sprintf( - "polling task %s initialized, will polling by taskId %s, msg: %s", - w.Key, - taskId, - res.Message, - ) - } - return hasChecked - }, taskId) + w.newTaskInfo(taskId, res.Message, processing, approved) + checked.Insert(approved...) + for _, po := range processing { + rejectedPods[po] = fmt.Sprintf( + "Polling task %s initialized, will polling by taskId %s, msg: %s", + w.Key, + taskId, + res.Message, + ) + } } return &FilterResult{ @@ -381,36 +341,44 @@ func (w *Webhook) convTaskInfo(infoMap map[string]*appsv1alpha1.TaskInfo) []apps return states } -func (w *Webhook) recordTask(taskId string, msg string, updateTime time.Time) { - tm := w.getTaskInfo(taskId) - // first record - var beginTime, lastTime *metav1.Time - if tm == nil { - beginTime = &metav1.Time{Time: time.Now()} - } else { - beginTime = tm.BeginTime.DeepCopy() +func (w *Webhook) newTaskInfo(taskId string, msg string, processing, approved []string) { + w.taskInfo[taskId] = &appsv1alpha1.TaskInfo{ + BeginTime: &metav1.Time{Time: time.Now()}, + Message: msg, + TaskId: taskId, + Processing: processing, + Approved: approved, + } +} + +func (w *Webhook) recordTaskInfo(old *appsv1alpha1.TaskInfo, msg string, updateTime time.Time, approved []string) { + approvedSets := sets.NewString(old.Approved...) + processingSets := sets.NewString(old.Processing...) + for _, po := range approved { + if processingSets.Has(po) { + approvedSets.Insert(po) + } } + processingSets.Delete(approved...) + var beginTime, lastTime *metav1.Time + beginTime = old.BeginTime.DeepCopy() if updateTime.After(beginTime.Time) { lastTime = &metav1.Time{Time: updateTime} } - w.taskInfo[taskId] = &appsv1alpha1.TaskInfo{ - BeginTime: beginTime, - LastTime: lastTime, - Message: msg, - TaskId: taskId, + w.taskInfo[old.TaskId] = &appsv1alpha1.TaskInfo{ + BeginTime: beginTime, + LastTime: lastTime, + Message: msg, + TaskId: old.TaskId, + Approved: approvedSets.List(), + Processing: processingSets.List(), } } -func (w *Webhook) getTaskInfo(traceId string) *appsv1alpha1.TaskInfo { - if w.State.WebhookStatus == nil { - return nil - } - for i, state := range w.State.WebhookStatus.TaskStates { - if state.TaskId == traceId { - return &w.State.WebhookStatus.TaskStates[i] - } +func (w *Webhook) updateInterval(interval time.Duration) { + if interval >= 0 && w.retryInterval == nil || *w.retryInterval > interval { + w.retryInterval = &interval } - return nil } func (w *Webhook) getPollingUrl(taskId string) (string, error) { @@ -424,8 +392,8 @@ func (w *Webhook) getPollingUrl(taskId string) (string, error) { return pollUrl, nil } -func (w *Webhook) query(podSet sets.String) (string, *appsv1alpha1.WebhookResponse, error) { - req, err := w.buildRequest(podSet) +func (w *Webhook) query(podSet sets.String, targets map[string]*corev1.Pod) (string, *appsv1alpha1.WebhookResponse, error) { + req, err := w.buildRequest(podSet, targets) if err != nil { return req.TraceId, nil, err } @@ -445,17 +413,6 @@ func (w *Webhook) doHttp(req *appsv1alpha1.WebhookRequest) (*appsv1alpha1.Webhoo return resp, nil } -func appendStatus(current []*appsv1alpha1.ItemStatus, nameSet sets.String, checkFunc func(string) bool, taskId string) []*appsv1alpha1.ItemStatus { - for name := range nameSet { - current = append(current, &appsv1alpha1.ItemStatus{ - Name: name, - WebhookChecked: checkFunc(name), - TaskId: taskId, - }) - } - return current -} - func shouldPoll(resp *appsv1alpha1.WebhookResponse) bool { return resp.Async || resp.Poll } @@ -469,3 +426,23 @@ func getTaskId(resp *appsv1alpha1.WebhookResponse) string { } return "" } + +func approve(taskInfo *appsv1alpha1.TaskInfo, approved []string) *appsv1alpha1.TaskInfo { + approvedSets := sets.NewString(taskInfo.Approved...) + processingSets := sets.NewString(taskInfo.Processing...) + inProcessing := Intersection(processingSets, approved) + processingSets.Delete(inProcessing...) + approvedSets.Insert(inProcessing...) + taskInfo.Approved = approvedSets.List() + taskInfo.Processing = approvedSets.List() + return taskInfo +} + +func Intersection(s sets.String, t []string) (res []string) { + for _, item := range t { + if s.Has(item) { + res = append(res, item) + } + } + return res +} diff --git a/pkg/controllers/podtransitionrule/processor/rules/webhook_test.go b/pkg/controllers/podtransitionrule/processor/rules/webhook_test.go index f03bae79..e2aa2f4b 100644 --- a/pkg/controllers/podtransitionrule/processor/rules/webhook_test.go +++ b/pkg/controllers/podtransitionrule/processor/rules/webhook_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" appsv1alpha1 "kusionstack.io/operating/apis/apps/v1alpha1" + "kusionstack.io/operating/pkg/utils" ) var ( @@ -217,8 +218,7 @@ func TestWebhookPollFail(t *testing.T) { g.Expect(len(webhooks)).Should(gomega.BeEquivalentTo(1)) web := webhooks[0] res := web.Do(targets, subjects) - rj, _ := json.Marshal(res) - fmt.Printf("res: %s\n", string(rj)) + fmt.Printf("res: %s\n", utils.DumpJSON(res)) g.Expect(res.Passed.Len()).Should(gomega.BeEquivalentTo(0)) g.Expect(len(res.Rejected)).Should(gomega.BeEquivalentTo(3)) g.Expect(res.RuleState).ShouldNot(gomega.BeNil()) @@ -229,8 +229,7 @@ func TestWebhookPollFail(t *testing.T) { webhooks = GetWebhook(pollRS) web = webhooks[0] res = web.Do(targets, subjects) - rj, _ = json.Marshal(res) - fmt.Printf("res: %s\n", string(rj)) + fmt.Printf("res: %s\n", utils.DumpJSON(res)) // reject by interval g.Expect(res.Passed.Len()).Should(gomega.BeEquivalentTo(0)) @@ -242,9 +241,7 @@ func TestWebhookPollFail(t *testing.T) { webhooks = GetWebhook(pollRS) web = webhooks[0] res = web.Do(targets, subjects) - rj, _ = json.Marshal(res) - fmt.Printf("res: %s\n", string(rj)) - // pass one + fmt.Printf("res: %s\n", utils.DumpJSON(res)) g.Expect(res.Passed.Len()).Should(gomega.BeEquivalentTo(2)) g.Expect(len(res.Rejected)).Should(gomega.BeEquivalentTo(1)) @@ -254,9 +251,7 @@ func TestWebhookPollFail(t *testing.T) { webhooks = GetWebhook(pollRS) web = webhooks[0] res = web.Do(targets, subjects) - rj, _ = json.Marshal(res) - fmt.Printf("res: %s\n", string(rj)) - // failed one + fmt.Printf("res: %s\n", utils.DumpJSON(res)) g.Expect(len(res.Rejected)).Should(gomega.BeEquivalentTo(1)) g.Expect(res.Passed.Len()).Should(gomega.BeEquivalentTo(2)) } @@ -371,3 +366,8 @@ func (p *podTemplate) GetPod() *corev1.Pod { }, } } + +func printJson(obj any) { + byt, _ := json.MarshalIndent(obj, "", " ") + fmt.Printf("%s\n", string(byt)) +} From c85b0f9e33db1ca36a168a3cafb3c9a8382f3f58 Mon Sep 17 00:00:00 2001 From: Eikykun Date: Tue, 26 Dec 2023 10:52:48 +0800 Subject: [PATCH 5/6] add test case --- .../processor/rules/server_test.go | 18 +++-- .../processor/rules/webhook.go | 22 +++++- .../processor/rules/webhook_test.go | 72 ++++++++++++++----- 3 files changed, 87 insertions(+), 25 deletions(-) diff --git a/pkg/controllers/podtransitionrule/processor/rules/server_test.go b/pkg/controllers/podtransitionrule/processor/rules/server_test.go index 439e7614..fa98e44f 100644 --- a/pkg/controllers/podtransitionrule/processor/rules/server_test.go +++ b/pkg/controllers/podtransitionrule/processor/rules/server_test.go @@ -67,7 +67,7 @@ func RunHttpServer(f func(http.ResponseWriter, *http.Request), port string) (cha } func handleHttpAlwaysSuccess(resp http.ResponseWriter, req *http.Request) { - fmt.Println(req.URL) + fmt.Printf("handleHttpAlwaysSuccess, %s\n", req.URL) all, err := io.ReadAll(req.Body) if err != nil { fmt.Println(fmt.Sprintf("read body err: %s", err)) @@ -89,7 +89,7 @@ func handleHttpAlwaysSuccess(resp http.ResponseWriter, req *http.Request) { } func handleHttpAlwaysFalse(resp http.ResponseWriter, req *http.Request) { - fmt.Println(req.URL) + fmt.Printf("handleHttpAlwaysFalse, %s\n", req.URL) all, err := io.ReadAll(req.Body) if err != nil { fmt.Println(fmt.Sprintf("read body err: %s", err)) @@ -110,7 +110,7 @@ func handleHttpAlwaysFalse(resp http.ResponseWriter, req *http.Request) { } func handleHttpAlwaysSomeSucc(resp http.ResponseWriter, req *http.Request) { - fmt.Println(req.URL) + fmt.Printf("handleHttpAlwaysSomeSucc, %s\n", req.URL) all, err := io.ReadAll(req.Body) if err != nil { fmt.Println(fmt.Sprintf("read body err: %s", err)) @@ -162,7 +162,7 @@ func (s *setsTimer) getNow() (sets.String, bool) { } func handleFirstPollSucc(resp http.ResponseWriter, req *http.Request) { - fmt.Println(req.URL) + fmt.Printf("handleFirstPollSucc, %s\n", req.URL) webReq := &appsv1alpha1.WebhookRequest{} all, err := io.ReadAll(req.Body) if err != nil { @@ -188,7 +188,7 @@ func handleFirstPollSucc(resp http.ResponseWriter, req *http.Request) { } func handleHttpWithTaskIdSucc(resp http.ResponseWriter, req *http.Request) { - fmt.Println(req.URL) + fmt.Printf("handleHttpWithTaskIdSucc, %s\n", req.URL) taskId := req.URL.Query().Get("trace-id") col, ok := traceCache[taskId] @@ -216,7 +216,7 @@ func handleHttpWithTaskIdSucc(resp http.ResponseWriter, req *http.Request) { } func handleHttpWithTaskIdFail(resp http.ResponseWriter, req *http.Request) { - fmt.Println(req.URL) + fmt.Printf("handleHttpWithTaskIdFail, %s\n", req.URL) taskId := req.URL.Query().Get("trace-id") col, ok := traceCache[taskId] @@ -229,6 +229,7 @@ func handleHttpWithTaskIdFail(resp http.ResponseWriter, req *http.Request) { webhookResp = &appsv1alpha1.PollResponse{ Success: false, Message: "fail", + Stop: true, } } else { webhookResp = &appsv1alpha1.PollResponse{ @@ -242,6 +243,11 @@ func handleHttpWithTaskIdFail(resp http.ResponseWriter, req *http.Request) { resp.Write(byt) } +func handleHttpError(resp http.ResponseWriter, req *http.Request) { + fmt.Printf("handleHttpError, %s\n", req.URL) + http.Error(resp, fmt.Sprintf("test http error case"), http.StatusInternalServerError) +} + func getPods(req *appsv1alpha1.WebhookRequest) sets.String { res := sets.NewString() for _, param := range req.Resources { diff --git a/pkg/controllers/podtransitionrule/processor/rules/webhook.go b/pkg/controllers/podtransitionrule/processor/rules/webhook.go index 502851ea..c98c559d 100644 --- a/pkg/controllers/podtransitionrule/processor/rules/webhook.go +++ b/pkg/controllers/podtransitionrule/processor/rules/webhook.go @@ -144,9 +144,26 @@ func (w *Webhook) Do(targets map[string]*corev1.Pod, subjects sets.String) *Filt // get latest polling result pollingResult := PollingManager.GetResult(taskId) + // restart case if pollingResult == nil { - // TODO: After restart... - panic("null polling result") + pollUrl, _ := w.getPollingUrl(taskId) + PollingManager.Add( + taskId, + pollUrl, + w.Webhook.ClientConfig.Poll.CABundle, + w.Key, + time.Duration(*w.Webhook.ClientConfig.Poll.TimeoutSeconds)*time.Second, + time.Duration(*w.Webhook.ClientConfig.Poll.IntervalSeconds)*time.Second, + ) + rejectMsg := fmt.Sprintf( + "Task %s polling result not found, try polling again, %s", + w.Key, + taskId, + ) + for po := range currentPods { + rejectedPods[po] = rejectMsg + } + continue } if pollingResult.ApproveAll { @@ -270,7 +287,6 @@ func (w *Webhook) Do(targets map[string]*corev1.Pod, subjects sets.String) *Filt // success, init poll task // trigger reconcile by PollingManager listener if taskId == "" { - // TODO: invalid taskID klog.Warningf("%s handle invalid webhook response, empty taskId in polling response", w.Key) for eft := range effectiveSubjects { rejectedPods[eft] = fmt.Sprintf("Invalid empty taskID, request trace %s", selfTraceId) diff --git a/pkg/controllers/podtransitionrule/processor/rules/webhook_test.go b/pkg/controllers/podtransitionrule/processor/rules/webhook_test.go index e2aa2f4b..635ee274 100644 --- a/pkg/controllers/podtransitionrule/processor/rules/webhook_test.go +++ b/pkg/controllers/podtransitionrule/processor/rules/webhook_test.go @@ -146,8 +146,7 @@ func TestWebhookAlwaysSuccess(t *testing.T) { web := webhooks[0] // 2 pass res := web.Do(targets, subjects) - rj, _ := json.Marshal(res) - fmt.Printf("res: %s", string(rj)) + fmt.Printf("res: %s", utils.DumpJSON(res)) g.Expect(res.Passed.Len()).Should(gomega.BeEquivalentTo(2)) } @@ -168,8 +167,7 @@ func TestWebhookAlwaysFail(t *testing.T) { g.Expect(len(webhooks)).Should(gomega.BeEquivalentTo(1)) web := webhooks[0] res := web.Do(targets, subjects) - rj, _ := json.Marshal(res) - fmt.Printf("res: %s", string(rj)) + fmt.Printf("res: %s", utils.DumpJSON(res)) g.Expect(res.Passed.Len()).Should(gomega.BeEquivalentTo(0)) g.Expect(len(res.Rejected)).Should(gomega.BeEquivalentTo(1)) } @@ -191,8 +189,7 @@ func TestWebhookFailSub(t *testing.T) { g.Expect(len(webhooks)).Should(gomega.BeEquivalentTo(1)) web := webhooks[0] res := web.Do(targets, subjects) - rj, _ := json.Marshal(res) - fmt.Printf("res: %s", string(rj)) + fmt.Printf("res: %s", utils.DumpJSON(res)) g.Expect(res.Passed.Len()).Should(gomega.BeEquivalentTo(2)) g.Expect(len(res.Rejected)).Should(gomega.BeEquivalentTo(1)) } @@ -277,8 +274,7 @@ func TestWebhookPoll(t *testing.T) { g.Expect(len(webhooks)).Should(gomega.BeEquivalentTo(1)) web := webhooks[0] res := web.Do(targets, subjects) - rj, _ := json.Marshal(res) - fmt.Printf("res: %s\n", string(rj)) + fmt.Printf("res: %s", utils.DumpJSON(res)) g.Expect(res.Passed.Len()).Should(gomega.BeEquivalentTo(0)) g.Expect(len(res.Rejected)).Should(gomega.BeEquivalentTo(3)) g.Expect(res.RuleState).ShouldNot(gomega.BeNil()) @@ -289,8 +285,7 @@ func TestWebhookPoll(t *testing.T) { webhooks = GetWebhook(pollRS) web = webhooks[0] res = web.Do(targets, subjects) - rj, _ = json.Marshal(res) - fmt.Printf("res: %s\n", string(rj)) + fmt.Printf("res: %s", utils.DumpJSON(res)) // reject by interval g.Expect(res.Passed.Len()).Should(gomega.BeEquivalentTo(0)) @@ -302,9 +297,7 @@ func TestWebhookPoll(t *testing.T) { webhooks = GetWebhook(pollRS.DeepCopy()) web = webhooks[0] res = web.Do(targets, subjects) - rj, _ = json.Marshal(res) - fmt.Printf("res: %s\n", string(rj)) - // pass one + fmt.Printf("res: %s", utils.DumpJSON(res)) g.Expect(res.Passed.Len()).Should(gomega.BeEquivalentTo(2)) g.Expect(len(res.Rejected)).Should(gomega.BeEquivalentTo(1)) @@ -314,13 +307,60 @@ func TestWebhookPoll(t *testing.T) { webhooks = GetWebhook(pollRS.DeepCopy()) web = webhooks[0] res = web.Do(targets, subjects) - rj, _ = json.Marshal(res) - fmt.Printf("res: %s\n", string(rj)) - // pass one + fmt.Printf("res: %s", utils.DumpJSON(res)) g.Expect(res.Passed.Len()).Should(gomega.BeEquivalentTo(3)) g.Expect(len(res.Rejected)).Should(gomega.BeEquivalentTo(0)) } +func TestWebhookPollError(t *testing.T) { + stopA, finishA := RunHttpServer(handleFirstPollSucc, "8888") + stopB, finishB := RunHttpServer(handleHttpError, "8889") + defer func() { + stopA <- struct{}{} + stopB <- struct{}{} + <-finishA + <-finishB + }() + targets := map[string]*corev1.Pod{ + "test-pod-a": (&podTemplate{Name: "test-pod-a", Ip: "1.1.1.58"}).GetPod(), + "test-pod-b": (&podTemplate{Name: "test-pod-b", Ip: "1.1.1.59"}).GetPod(), + "test-pod-c": (&podTemplate{Name: "test-pod-c", Ip: "1.1.1.60"}).GetPod(), + } + subjects := sets.NewString("test-pod-a", "test-pod-b", "test-pod-c") + g := gomega.NewGomegaWithT(t) + pollRS := poRS.DeepCopy() + webhooks := GetWebhook(pollRS) + g.Expect(len(webhooks)).Should(gomega.BeEquivalentTo(1)) + web := webhooks[0] + res := web.Do(targets, subjects) + fmt.Printf("res: %s", utils.DumpJSON(res)) + g.Expect(res.Passed.Len()).Should(gomega.BeEquivalentTo(0)) + g.Expect(len(res.Rejected)).Should(gomega.BeEquivalentTo(3)) + g.Expect(res.RuleState).ShouldNot(gomega.BeNil()) + + state := &appsv1alpha1.RuleState{Name: web.RuleName, WebhookStatus: res.RuleState.WebhookStatus} + pollRS.Status.RuleStates = []*appsv1alpha1.RuleState{state} + + webhooks = GetWebhook(pollRS) + web = webhooks[0] + res = web.Do(targets, subjects) + fmt.Printf("res: %s", utils.DumpJSON(res)) + + // reject by error + g.Expect(res.Passed.Len()).Should(gomega.BeEquivalentTo(0)) + g.Expect(len(res.Rejected)).Should(gomega.BeEquivalentTo(3)) + + <-time.After(6 * time.Second) + state = &appsv1alpha1.RuleState{Name: web.RuleName, WebhookStatus: res.RuleState.WebhookStatus} + pollRS.Status.RuleStates = []*appsv1alpha1.RuleState{state} + webhooks = GetWebhook(pollRS.DeepCopy()) + web = webhooks[0] + res = web.Do(targets, subjects) + fmt.Printf("res: %s", utils.DumpJSON(res)) + g.Expect(res.Passed.Len()).Should(gomega.BeEquivalentTo(0)) + g.Expect(len(res.Rejected)).Should(gomega.BeEquivalentTo(3)) +} + type podTemplate struct { Name string Ip string From 3ae4516ae08751f2f3361a005ee8df7b481fe1d0 Mon Sep 17 00:00:00 2001 From: Eikykun Date: Tue, 26 Dec 2023 11:10:53 +0800 Subject: [PATCH 6/6] add podtransitionrule test case --- .../podtransitionrule_controller.go | 9 ---- .../processor/rules/types.go | 8 ---- .../podtransitionrule/register/cache_test.go | 47 +++++++++++++++++++ 3 files changed, 47 insertions(+), 17 deletions(-) create mode 100644 pkg/controllers/podtransitionrule/register/cache_test.go diff --git a/pkg/controllers/podtransitionrule/podtransitionrule_controller.go b/pkg/controllers/podtransitionrule/podtransitionrule_controller.go index 505fb3ba..e5885608 100644 --- a/pkg/controllers/podtransitionrule/podtransitionrule_controller.go +++ b/pkg/controllers/podtransitionrule/podtransitionrule_controller.go @@ -301,15 +301,6 @@ func (r *PodTransitionRuleReconciler) updatePodTransitionRuleOnPod(ctx context.C }) } -func (r *PodTransitionRuleReconciler) hasRunningPod(pods *corev1.PodList) bool { - for _, pod := range pods.Items { - if pod.DeletionTimestamp == nil { - return true - } - } - return false -} - func updateDetail(details map[string]*appsv1alpha1.PodTransitionDetail, passRules *processor.ProcessResult, stage string) { for po, rules := range passRules.PassRules { var rejectInfo *appsv1alpha1.RejectInfo diff --git a/pkg/controllers/podtransitionrule/processor/rules/types.go b/pkg/controllers/podtransitionrule/processor/rules/types.go index ea7b7d95..98cb0209 100644 --- a/pkg/controllers/podtransitionrule/processor/rules/types.go +++ b/pkg/controllers/podtransitionrule/processor/rules/types.go @@ -78,14 +78,6 @@ func reject(subjects, passed sets.String, rejects map[string]string, reason stri } } -// -//type Response struct { -// Success bool `json:"success"` -// Message string `json:"message"` -// RetryByTrace bool `json:"retryByTrace"` -// Passed []string `json:"finishedNames,omitempty"` -//} - type HttpJob struct { Url string CaBundle []byte diff --git a/pkg/controllers/podtransitionrule/register/cache_test.go b/pkg/controllers/podtransitionrule/register/cache_test.go new file mode 100644 index 00000000..36835d53 --- /dev/null +++ b/pkg/controllers/podtransitionrule/register/cache_test.go @@ -0,0 +1,47 @@ +/* + Copyright 2023 The KusionStack Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package register + +import ( + "testing" + + "github.com/onsi/gomega" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func TestCache(t *testing.T) { + g := gomega.NewGomegaWithT(t) + ca := newCache() + ca.RegisterStage("stage-a", func(obj client.Object) bool { + return true + }) + ca.RegisterStage("stage-b", func(obj client.Object) bool { + return false + }) + g.Expect(ca.InStage(nil, "stage-a")).Should(gomega.BeTrue()) + g.Expect(ca.InStage(nil, "stage-b")).Should(gomega.BeFalse()) + g.Expect(len(ca.GetStages())).Should(gomega.Equal(2)) + ca.RegisterCondition("condition-a", func(obj client.Object) bool { + return true + }) + ca.RegisterCondition("condition-b", func(obj client.Object) bool { + return false + }) + g.Expect(len(ca.Conditions(nil))).Should(gomega.Equal(1)) + g.Expect(ca.Conditions(nil)[0]).Should(gomega.Equal("condition-a")) + g.Expect(len(ca.MatchConditions(nil, "xxx", "condition-a", "condition-b"))).Should(gomega.Equal(1)) +}