diff --git a/apis/apps/pub/launch_priority.go b/apis/apps/pub/launch_priority.go index 7743b725be..d4c0dbf979 100644 --- a/apis/apps/pub/launch_priority.go +++ b/apis/apps/pub/launch_priority.go @@ -20,6 +20,10 @@ const ( // ContainerLaunchPriorityEnvName is the env name that users have to define in pod container // to identity the launch priority of this container. ContainerLaunchPriorityEnvName = "KRUISE_CONTAINER_PRIORITY" + // ContainerLaunchTimeOutEnvName is high priority container startup times out. + ContainerLaunchTimeOutEnvName = "KRUISE_CONTAINER_LAUNCH_TIMEOUT" + // ContainerLaunchPriorityUpdateTimeKey a label used to record the update time. + ContainerLaunchPriorityUpdateTimeKey = "apps.kruise.io/container-launch-priority-update-time" // ContainerLaunchBarrierEnvName is the env name that Kruise webhook will inject into containers // if the pod have configured launch priority. ContainerLaunchBarrierEnvName = "KRUISE_CONTAINER_BARRIER" diff --git a/pkg/controller/containerlaunchpriority/container_launch_priority_controller.go b/pkg/controller/containerlaunchpriority/container_launch_priority_controller.go index 6f9557bc14..4483d3f730 100644 --- a/pkg/controller/containerlaunchpriority/container_launch_priority_controller.go +++ b/pkg/controller/containerlaunchpriority/container_launch_priority_controller.go @@ -22,6 +22,8 @@ import ( "strconv" "time" + appspub "github.com/openkruise/kruise/apis/apps/pub" + "github.com/openkruise/kruise/pkg/util" utilclient "github.com/openkruise/kruise/pkg/util/client" utilcontainerlaunchpriority "github.com/openkruise/kruise/pkg/util/containerlaunchpriority" @@ -43,7 +45,8 @@ import ( ) const ( - concurrentReconciles = 4 + concurrentReconciles = 4 + defaultContainerLaunchTimeout = 60 ) func Add(mgr manager.Manager) error { @@ -132,6 +135,9 @@ func (r *ReconcileContainerLaunchPriority) Reconcile(_ context.Context, request } err = r.Get(context.TODO(), barrierNamespacedName, barrier) if errors.IsNotFound(err) { + barrier.Annotations = map[string]string{ + appspub.ContainerLaunchPriorityUpdateTimeKey: time.Now().Format(time.RFC3339), + } barrier.Namespace = pod.GetNamespace() barrier.Name = pod.Name + "-barrier" barrier.OwnerReferences = append(barrier.OwnerReferences, metav1.OwnerReference{ @@ -151,23 +157,42 @@ func (r *ReconcileContainerLaunchPriority) Reconcile(_ context.Context, request return reconcile.Result{}, err } + var requeueTime time.Duration // set next starting containers _, containersReady := podutil.GetPodCondition(&pod.Status, v1.ContainersReady) if containersReady != nil && containersReady.Status != v1.ConditionTrue { - patchKey := r.findNextPatchKey(pod) + patchKey, timeout, containers := r.findNextPatchKey(pod) if patchKey == nil { return reconcile.Result{}, nil } + updateTime := time.Now() + if barrier.Annotations != nil { + updateStr := barrier.Annotations[appspub.ContainerLaunchPriorityUpdateTimeKey] + parse, err := time.Parse(time.RFC3339, updateStr) + if err == nil { + updateTime = parse + } + } + for _, container := range containers { + containerStatus := util.GetContainerStatus(container.Name, pod) + if timeout > 0 && time.Duration(timeout)*time.Second < time.Since(updateTime) && (containerStatus == nil || containerStatus.Ready == false) { + r.recorder.Eventf(barrier, v1.EventTypeWarning, "ContainerLaunchTimeout", "Container %s has not launched successfully more than %ss.", container.Name, strconv.Itoa(timeout)) + } + } + + if time.Duration(timeout)*time.Second-time.Since(updateTime) > 0 { + requeueTime = time.Duration(timeout)*time.Second - time.Since(updateTime) + } key := "p_" + strconv.Itoa(*patchKey) if err = r.patchOnKeyNotExist(barrier, key); err != nil { return reconcile.Result{}, err } } - return reconcile.Result{}, nil + return reconcile.Result{RequeueAfter: requeueTime}, nil } -func (r *ReconcileContainerLaunchPriority) findNextPatchKey(pod *v1.Pod) *int { +func (r *ReconcileContainerLaunchPriority) findNextPatchKey(pod *v1.Pod) (*int, int, []*v1.Container) { var priority *int var containerPendingSet = make(map[string]bool) for _, status := range pod.Status.ContainerStatuses { @@ -176,6 +201,9 @@ func (r *ReconcileContainerLaunchPriority) findNextPatchKey(pod *v1.Pod) *int { } containerPendingSet[status.Name] = true } + + timeout := 0 + var priorityMap = make(map[int][]*v1.Container, len(pod.Spec.Containers)) for _, c := range pod.Spec.Containers { if _, ok := containerPendingSet[c.Name]; ok { p := utilcontainerlaunchpriority.GetContainerPriority(&c) @@ -184,19 +212,37 @@ func (r *ReconcileContainerLaunchPriority) findNextPatchKey(pod *v1.Pod) *int { } if priority == nil || *p > *priority { priority = p + timeout = getTimeout(c) + priorityMap[timeout] = append(priorityMap[timeout], &c) } } } - return priority + return priority, timeout, priorityMap[timeout] } func (r *ReconcileContainerLaunchPriority) patchOnKeyNotExist(barrier *v1.ConfigMap, key string) error { if _, ok := barrier.Data[key]; !ok { body := fmt.Sprintf( - `{"data":{"%s":"true"}}`, - key, + `{"data":{"%s":"true"},"metadata":{"annotations":{"%s":"%s"}}}`, + key, appspub.ContainerLaunchPriorityUpdateTimeKey, time.Now().Format(time.RFC3339), ) return r.Client.Patch(context.TODO(), barrier, client.RawPatch(types.StrategicMergePatchType, []byte(body))) } return nil } + +func parseContainerLaunchTimeOut(v string) int { + p, _ := strconv.Atoi(v) + if p < 0 { + return defaultContainerLaunchTimeout + } + return p +} +func getTimeout(c v1.Container) int { + for _, e := range c.Env { + if e.Name == appspub.ContainerLaunchTimeOutEnvName { + return parseContainerLaunchTimeOut(e.Value) + } + } + return 0 +} diff --git a/pkg/controller/containerlaunchpriority/container_launch_priority_controller_test.go b/pkg/controller/containerlaunchpriority/container_launch_priority_controller_test.go index 26b7939b5b..d40316e62f 100644 --- a/pkg/controller/containerlaunchpriority/container_launch_priority_controller_test.go +++ b/pkg/controller/containerlaunchpriority/container_launch_priority_controller_test.go @@ -19,6 +19,7 @@ package containerlauchpriority import ( "context" "testing" + "time" appspub "github.com/openkruise/kruise/apis/apps/pub" v1 "k8s.io/api/core/v1" @@ -81,6 +82,9 @@ func TestReconcile(t *testing.T) { Key: "p_1000", }, }, + }, { + Name: appspub.ContainerLaunchTimeOutEnvName, + Value: "3", }}, }}, }, @@ -103,10 +107,22 @@ func TestReconcile(t *testing.T) { } barrier0 := &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{Namespace: metav1.NamespaceDefault, Name: "pod0-barrier"}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceDefault, + Name: "pod0-barrier", + Annotations: map[string]string{ + appspub.ContainerLaunchPriorityUpdateTimeKey: time.Now().Format(time.RFC3339), + }, + }, } barrier1 := &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{Namespace: metav1.NamespaceDefault, Name: "pod1-barrier"}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: metav1.NamespaceDefault, + Name: "pod1-barrier", + Annotations: map[string]string{ + appspub.ContainerLaunchPriorityUpdateTimeKey: time.Now().Format(time.RFC3339), + }, + }, } fakeClient := fake.NewFakeClientWithScheme(clientgoscheme.Scheme, pod0, pod1, barrier0, barrier1) @@ -135,9 +151,40 @@ func TestReconcile(t *testing.T) { if err := fakeClient.Get(context.TODO(), types.NamespacedName{Namespace: barrier1.Namespace, Name: barrier1.Name}, newBarrier1); err != nil { t.Fatal(err) } - if v, ok := newBarrier1.Data["p_1000"]; !ok { + if _, ok := newBarrier1.Data["p_1000"]; !ok { t.Fatalf("expect barrier1 env set, but not") - } else if v != "true" { - t.Fatalf("expect barrier1 p_1000 to be true, but get %s", v) } + + if _, ok := newBarrier1.Data["p_100"]; ok { + t.Fatalf("expect barrier1 p_100 not to be set, but get ") + } + time.Sleep(time.Second * 3) + _, err = reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Namespace: pod1.Namespace, Name: pod1.Name}}) + if err != nil { + t.Fatal(err) + } + es := v1.EventList{} + fakeClient.List(context.Background(), &es) + if len(es.Items) != 1 { + t.Fatal("expect a event") + } + pod1.Status.ContainerStatuses[0].Ready = true + err = fakeClient.Update(context.Background(), pod1) + if err != nil { + t.Fatal(err) + } + _, err = reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Namespace: pod1.Namespace, Name: pod1.Name}}) + if err != nil { + t.Fatal(err) + } + + newBarrier1 = &v1.ConfigMap{} + if err := fakeClient.Get(context.TODO(), types.NamespacedName{Namespace: barrier1.Namespace, Name: barrier1.Name}, newBarrier1); err != nil { + t.Fatal(err) + } + + if _, ok := newBarrier1.Data["p_100"]; !ok { + t.Fatalf("expect barrier1 p_100 to be set, but not ") + } + }