From 8ce869ca19e7d2e0fcb92ee3b5047995dcd8ce8f Mon Sep 17 00:00:00 2001 From: Amanuel Engeda <74629455+engedaam@users.noreply.github.com> Date: Sun, 17 Nov 2024 21:46:53 -0800 Subject: [PATCH] feat: Node Repair implementation (#1793) --- kwok/cloudprovider/cloudprovider.go | 4 + kwok/main.go | 1 + pkg/cloudprovider/fake/cloudprovider.go | 13 + pkg/cloudprovider/types.go | 14 + pkg/controllers/controllers.go | 15 +- pkg/controllers/node/health/controller.go | 132 ++++++++ pkg/controllers/node/health/suite_test.go | 298 ++++++++++++++++++ .../nodeclaim/lifecycle/controller.go | 8 +- pkg/operator/options/options.go | 6 +- pkg/operator/options/suite_test.go | 10 +- pkg/test/options.go | 2 + 11 files changed, 497 insertions(+), 6 deletions(-) create mode 100644 pkg/controllers/node/health/controller.go create mode 100644 pkg/controllers/node/health/suite_test.go diff --git a/kwok/cloudprovider/cloudprovider.go b/kwok/cloudprovider/cloudprovider.go index 2d5477f366..793d3fd756 100644 --- a/kwok/cloudprovider/cloudprovider.go +++ b/kwok/cloudprovider/cloudprovider.go @@ -130,6 +130,10 @@ func (c CloudProvider) GetSupportedNodeClasses() []status.Object { return []status.Object{&v1alpha1.KWOKNodeClass{}} } +func (c *CloudProvider) RepairPolicies() []cloudprovider.RepairPolicy { + return []cloudprovider.RepairPolicy{} +} + func (c CloudProvider) getInstanceType(instanceTypeName string) (*cloudprovider.InstanceType, error) { it, found := lo.Find(c.instanceTypes, func(it *cloudprovider.InstanceType) bool { return it.Name == instanceTypeName diff --git a/kwok/main.go b/kwok/main.go index 4960aba329..a84c6e0a0f 100644 --- a/kwok/main.go +++ b/kwok/main.go @@ -34,6 +34,7 @@ func main() { cloudProvider := kwok.NewCloudProvider(ctx, op.GetClient(), instanceTypes) op. WithControllers(ctx, controllers.NewControllers( + ctx, op.Manager, op.Clock, op.GetClient(), diff --git a/pkg/cloudprovider/fake/cloudprovider.go b/pkg/cloudprovider/fake/cloudprovider.go index 357feac4c5..b00959458b 100644 --- a/pkg/cloudprovider/fake/cloudprovider.go +++ b/pkg/cloudprovider/fake/cloudprovider.go @@ -22,6 +22,7 @@ import ( "math" "sort" "sync" + "time" "github.com/awslabs/operatorpkg/status" "github.com/samber/lo" @@ -59,6 +60,7 @@ type CloudProvider struct { CreatedNodeClaims map[string]*v1.NodeClaim Drifted cloudprovider.DriftReason NodeClassGroupVersionKind []schema.GroupVersionKind + RepairPolicy []cloudprovider.RepairPolicy } func NewCloudProvider() *CloudProvider { @@ -93,6 +95,13 @@ func (c *CloudProvider) Reset() { Kind: "", }, } + c.RepairPolicy = []cloudprovider.RepairPolicy{ + { + ConditionType: "BadNode", + ConditionStatus: corev1.ConditionFalse, + TolerationDuration: 30 * time.Minute, + }, + } } func (c *CloudProvider) Create(ctx context.Context, nodeClaim *v1.NodeClaim) (*v1.NodeClaim, error) { @@ -262,6 +271,10 @@ func (c *CloudProvider) IsDrifted(context.Context, *v1.NodeClaim) (cloudprovider return c.Drifted, nil } +func (c *CloudProvider) RepairPolicies() []cloudprovider.RepairPolicy { + return c.RepairPolicy +} + // Name returns the CloudProvider implementation name. func (c *CloudProvider) Name() string { return "fake" diff --git a/pkg/cloudprovider/types.go b/pkg/cloudprovider/types.go index 6c7f986230..6696c6e0ac 100644 --- a/pkg/cloudprovider/types.go +++ b/pkg/cloudprovider/types.go @@ -23,6 +23,7 @@ import ( "math" "sort" "sync" + "time" "github.com/awslabs/operatorpkg/status" "github.com/samber/lo" @@ -41,6 +42,16 @@ var ( type DriftReason string +type RepairPolicy struct { + // ConditionType of unhealthy state that is found on the node + ConditionType corev1.NodeConditionType + // ConditionStatus condition when a node is unhealthy + ConditionStatus corev1.ConditionStatus + // TolerationDuration is the duration the controller will wait + // before force terminating nodes that are unhealthy. + TolerationDuration time.Duration +} + // CloudProvider interface is implemented by cloud providers to support provisioning. type CloudProvider interface { // Create launches a NodeClaim with the given resource requests and requirements and returns a hydrated @@ -63,6 +74,9 @@ type CloudProvider interface { // IsDrifted returns whether a NodeClaim has drifted from the provisioning requirements // it is tied to. IsDrifted(context.Context, *v1.NodeClaim) (DriftReason, error) + // RepairPolicy is for CloudProviders to define a set Unhealthy condition for Karpenter + // to monitor on the node. + RepairPolicies() []RepairPolicy // Name returns the CloudProvider implementation name. Name() string // GetSupportedNodeClasses returns CloudProvider NodeClass that implements status.Object diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index 7eb832f05a..2b8128c39c 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -17,6 +17,8 @@ limitations under the License. package controllers import ( + "context" + "github.com/awslabs/operatorpkg/controller" "github.com/awslabs/operatorpkg/status" "k8s.io/utils/clock" @@ -32,6 +34,7 @@ import ( metricsnode "sigs.k8s.io/karpenter/pkg/controllers/metrics/node" metricsnodepool "sigs.k8s.io/karpenter/pkg/controllers/metrics/nodepool" metricspod "sigs.k8s.io/karpenter/pkg/controllers/metrics/pod" + "sigs.k8s.io/karpenter/pkg/controllers/node/health" "sigs.k8s.io/karpenter/pkg/controllers/node/termination" "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator" nodeclaimconsistency "sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/consistency" @@ -48,9 +51,11 @@ import ( "sigs.k8s.io/karpenter/pkg/controllers/state" "sigs.k8s.io/karpenter/pkg/controllers/state/informer" "sigs.k8s.io/karpenter/pkg/events" + "sigs.k8s.io/karpenter/pkg/operator/options" ) func NewControllers( + ctx context.Context, mgr manager.Manager, clock clock.Clock, kubeClient client.Client, @@ -63,7 +68,7 @@ func NewControllers( evictionQueue := terminator.NewQueue(kubeClient, recorder) disruptionQueue := orchestration.NewQueue(kubeClient, recorder, cluster, clock, p) - return []controller.Controller{ + controllers := []controller.Controller{ p, evictionQueue, disruptionQueue, disruption.NewController(clock, kubeClient, p, cloudProvider, recorder, cluster, disruptionQueue), provisioning.NewPodController(kubeClient, p), @@ -90,5 +95,13 @@ func NewControllers( status.NewController[*v1.NodeClaim](kubeClient, mgr.GetEventRecorderFor("karpenter"), status.EmitDeprecatedMetrics), status.NewController[*v1.NodePool](kubeClient, mgr.GetEventRecorderFor("karpenter"), status.EmitDeprecatedMetrics), status.NewGenericObjectController[*corev1.Node](kubeClient, mgr.GetEventRecorderFor("karpenter")), + health.NewController(kubeClient, cloudProvider, clock), + } + + // The cloud provider must define status conditions for the node repair controller to use to detect unhealthy nodes + if len(cloudProvider.RepairPolicies()) != 0 && options.FromContext(ctx).FeatureGates.NodeRepair { + controllers = append(controllers, health.NewController(kubeClient, cloudProvider, clock)) } + + return controllers } diff --git a/pkg/controllers/node/health/controller.go b/pkg/controllers/node/health/controller.go new file mode 100644 index 0000000000..dd4258b0fd --- /dev/null +++ b/pkg/controllers/node/health/controller.go @@ -0,0 +1,132 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package health + +import ( + "context" + "time" + + "github.com/samber/lo" + corev1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + "k8s.io/utils/clock" + controllerruntime "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + v1 "sigs.k8s.io/karpenter/pkg/apis/v1" + "sigs.k8s.io/karpenter/pkg/cloudprovider" + "sigs.k8s.io/karpenter/pkg/metrics" + "sigs.k8s.io/karpenter/pkg/operator/injection" + nodeutils "sigs.k8s.io/karpenter/pkg/utils/node" +) + +// Controller for the resource +type Controller struct { + clock clock.Clock + kubeClient client.Client + cloudProvider cloudprovider.CloudProvider +} + +// NewController constructs a controller instance +func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, clock clock.Clock) *Controller { + return &Controller{ + clock: clock, + kubeClient: kubeClient, + cloudProvider: cloudProvider, + } +} + +func (c *Controller) Register(_ context.Context, m manager.Manager) error { + return controllerruntime.NewControllerManagedBy(m). + Named("node.health"). + For(&corev1.Node{}). + Complete(reconcile.AsReconciler(m.GetClient(), c)) +} + +func (c *Controller) Reconcile(ctx context.Context, node *corev1.Node) (reconcile.Result, error) { + ctx = injection.WithControllerName(ctx, "node.health") + ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Node", klog.KRef(node.Namespace, node.Name))) + + // Validate that the node is owned by us + nodeClaim, err := nodeutils.NodeClaimForNode(ctx, c.kubeClient, node) + if err != nil { + return reconcile.Result{}, nodeutils.IgnoreNodeClaimNotFoundError(err) + } + + unhealthyNodeCondition, policyTerminationDuration := c.findUnhealthyConditions(node) + if unhealthyNodeCondition == nil { + return reconcile.Result{}, nil + } + + // If the Node is unhealthy, but has not reached it's full toleration disruption + // requeue at the termination time of the unhealthy node + terminationTime := unhealthyNodeCondition.LastTransitionTime.Add(policyTerminationDuration) + if c.clock.Now().Before(terminationTime) { + return reconcile.Result{RequeueAfter: terminationTime.Sub(c.clock.Now())}, nil + } + + // For unhealthy past the tolerationDisruption window we can forcefully terminate the node + if err := c.annotateTerminationGracePeriod(ctx, nodeClaim); err != nil { + return reconcile.Result{}, client.IgnoreNotFound(err) + } + if err := c.kubeClient.Delete(ctx, nodeClaim); err != nil { + return reconcile.Result{}, client.IgnoreNotFound(err) + } + + // The deletion timestamp has successfully been set for the Node, update relevant metrics. + log.FromContext(ctx).V(1).Info("deleting unhealthy node") + metrics.NodeClaimsDisruptedTotal.Inc(map[string]string{ + metrics.ReasonLabel: string(unhealthyNodeCondition.Type), + metrics.NodePoolLabel: node.Labels[v1.NodePoolLabelKey], + metrics.CapacityTypeLabel: node.Labels[v1.CapacityTypeLabelKey], + }) + return reconcile.Result{}, nil +} + +// Find a node with a condition that matches one of the unhealthy conditions defined by the cloud provider +// If there are multiple unhealthy status condition we will requeue based on the condition closest to its terminationDuration +func (c *Controller) findUnhealthyConditions(node *corev1.Node) (nc *corev1.NodeCondition, cpTerminationDuration time.Duration) { + requeueTime := time.Time{} + for _, policy := range c.cloudProvider.RepairPolicies() { + // check the status and the type on the condition + nodeCondition := nodeutils.GetCondition(node, policy.ConditionType) + if nodeCondition.Status == policy.ConditionStatus { + terminationTime := nodeCondition.LastTransitionTime.Add(policy.TolerationDuration) + // Determine requeue time + if requeueTime.IsZero() || requeueTime.After(terminationTime) { + nc = lo.ToPtr(nodeCondition) + cpTerminationDuration = policy.TolerationDuration + requeueTime = terminationTime + } + } + } + return nc, cpTerminationDuration +} + +func (c *Controller) annotateTerminationGracePeriod(ctx context.Context, nodeClaim *v1.NodeClaim) error { + stored := nodeClaim.DeepCopy() + nodeClaim.ObjectMeta.Annotations = lo.Assign(nodeClaim.ObjectMeta.Annotations, map[string]string{v1.NodeClaimTerminationTimestampAnnotationKey: c.clock.Now().Format(time.RFC3339)}) + + if err := c.kubeClient.Patch(ctx, nodeClaim, client.MergeFrom(stored)); err != nil { + return err + } + + return nil +} diff --git a/pkg/controllers/node/health/suite_test.go b/pkg/controllers/node/health/suite_test.go new file mode 100644 index 0000000000..4d9d8a9338 --- /dev/null +++ b/pkg/controllers/node/health/suite_test.go @@ -0,0 +1,298 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package health_test + +import ( + "context" + "fmt" + "testing" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clock "k8s.io/utils/clock/testing" + "sigs.k8s.io/controller-runtime/pkg/cache" + + "sigs.k8s.io/controller-runtime/pkg/client" + + "sigs.k8s.io/karpenter/pkg/apis" + v1 "sigs.k8s.io/karpenter/pkg/apis/v1" + "sigs.k8s.io/karpenter/pkg/cloudprovider" + "sigs.k8s.io/karpenter/pkg/cloudprovider/fake" + "sigs.k8s.io/karpenter/pkg/controllers/node/health" + "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator" + "sigs.k8s.io/karpenter/pkg/metrics" + "sigs.k8s.io/karpenter/pkg/test" + . "sigs.k8s.io/karpenter/pkg/test/expectations" + "sigs.k8s.io/karpenter/pkg/test/v1alpha1" + . "sigs.k8s.io/karpenter/pkg/utils/testing" +) + +var ctx context.Context +var healthController *health.Controller +var env *test.Environment +var fakeClock *clock.FakeClock +var cloudProvider *fake.CloudProvider +var recorder *test.EventRecorder +var queue *terminator.Queue + +func TestAPIs(t *testing.T) { + ctx = TestContextWithLogger(t) + RegisterFailHandler(Fail) + RunSpecs(t, "Termination") +} + +var _ = BeforeSuite(func() { + fakeClock = clock.NewFakeClock(time.Now()) + env = test.NewEnvironment(test.WithCRDs(apis.CRDs...), test.WithCRDs(v1alpha1.CRDs...), test.WithFieldIndexers(test.NodeClaimFieldIndexer(ctx), test.VolumeAttachmentFieldIndexer(ctx), func(c cache.Cache) error { + return c.IndexField(ctx, &corev1.Node{}, "spec.providerID", func(obj client.Object) []string { + return []string{obj.(*corev1.Node).Spec.ProviderID} + }) + })) + cloudProvider = fake.NewCloudProvider() + cloudProvider = fake.NewCloudProvider() + recorder = test.NewEventRecorder() + queue = terminator.NewTestingQueue(env.Client, recorder) + healthController = health.NewController(env.Client, cloudProvider, fakeClock) +}) + +var _ = AfterSuite(func() { + Expect(env.Stop()).To(Succeed(), "Failed to stop environment") +}) + +var _ = Describe("Node Health", func() { + var node *corev1.Node + var nodeClaim *v1.NodeClaim + var nodePool *v1.NodePool + + BeforeEach(func() { + fakeClock.SetTime(time.Now()) + cloudProvider.Reset() + + nodePool = test.NodePool() + nodeClaim, node = test.NodeClaimAndNode(v1.NodeClaim{ObjectMeta: metav1.ObjectMeta{Finalizers: []string{v1.TerminationFinalizer}}}) + node.Labels[v1.NodePoolLabelKey] = nodePool.Name + nodeClaim.Labels[v1.NodePoolLabelKey] = nodePool.Name + cloudProvider.CreatedNodeClaims[node.Spec.ProviderID] = nodeClaim + }) + + AfterEach(func() { + ExpectCleanedUp(ctx, env.Client) + + // Reset the metrics collectors + metrics.NodeClaimsDisruptedTotal.Reset() + }) + + Context("Reconciliation", func() { + It("should delete nodes that are unhealthy by the cloud provider", func() { + node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{ + Type: "BadNode", + Status: corev1.ConditionFalse, + LastTransitionTime: metav1.Time{Time: fakeClock.Now()}, + }) + fakeClock.Step(60 * time.Minute) + ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node) + // Determine to delete unhealthy nodes + ExpectObjectReconciled(ctx, env.Client, healthController, node) + + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + Expect(nodeClaim.DeletionTimestamp).ToNot(BeNil()) + }) + It("should not delete node when unhealthy type does not match cloud provider passed in value", func() { + node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{ + Type: "FakeHealthyNode", + Status: corev1.ConditionFalse, + LastTransitionTime: metav1.Time{Time: fakeClock.Now()}, + }) + fakeClock.Step(60 * time.Minute) + ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node) + // Determine to delete unhealthy nodes + ExpectObjectReconciled(ctx, env.Client, healthController, node) + + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + Expect(nodeClaim.DeletionTimestamp).To(BeNil()) + }) + It("should not delete node when health status does not match cloud provider passed in value", func() { + node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{ + Type: "BadNode", + Status: corev1.ConditionTrue, + LastTransitionTime: metav1.Time{Time: fakeClock.Now()}, + }) + fakeClock.Step(60 * time.Minute) + ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node) + // Determine to delete unhealthy nodes + ExpectObjectReconciled(ctx, env.Client, healthController, node) + + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + Expect(nodeClaim.DeletionTimestamp).To(BeNil()) + }) + It("should not delete node when health duration is not reached", func() { + node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{ + Type: "BadNode", + Status: corev1.ConditionFalse, + // We expect the last transition for HealthyNode condition to wait 30 minutes + LastTransitionTime: metav1.Time{Time: fakeClock.Now()}, + }) + fakeClock.Step(20 * time.Minute) + ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node) + // Determine to delete unhealthy nodes + ExpectObjectReconciled(ctx, env.Client, healthController, node) + + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + Expect(nodeClaim.DeletionTimestamp).To(BeNil()) + }) + It("should set annotation termination grace period when force termination is started", func() { + node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{ + Type: "BadNode", + Status: corev1.ConditionFalse, + // We expect the last transition for HealthyNode condition to wait 30 minutes + LastTransitionTime: metav1.Time{Time: time.Now()}, + }) + fakeClock.Step(60 * time.Minute) + ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node) + // Determine to delete unhealthy nodes + ExpectObjectReconciled(ctx, env.Client, healthController, node) + + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + Expect(nodeClaim.Annotations).To(HaveKeyWithValue(v1.NodeClaimTerminationTimestampAnnotationKey, fakeClock.Now().Format(time.RFC3339))) + }) + It("should not respect termination grace period if set on the nodepool", func() { + nodeClaim.Spec.TerminationGracePeriod = &metav1.Duration{Duration: time.Minute} + node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{ + Type: "BadNode", + Status: corev1.ConditionFalse, + // We expect the last transition for HealthyNode condition to wait 30 minutes + LastTransitionTime: metav1.Time{Time: time.Now()}, + }) + fakeClock.Step(60 * time.Minute) + ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node) + // Determine to delete unhealthy nodes + ExpectObjectReconciled(ctx, env.Client, healthController, node) + + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + Expect(nodeClaim.Annotations).To(HaveKeyWithValue(v1.NodeClaimTerminationTimestampAnnotationKey, fakeClock.Now().Format(time.RFC3339))) + }) + It("should return the requeue interval for the condition closest to its terminationDuration", func() { + cloudProvider.RepairPolicy = []cloudprovider.RepairPolicy{ + { + ConditionType: "BadNode", + ConditionStatus: corev1.ConditionFalse, + TolerationDuration: 60 * time.Minute, + }, + { + ConditionType: "ValidUnhealthyCondition", + ConditionStatus: corev1.ConditionFalse, + TolerationDuration: 30 * time.Minute, + }, + } + node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{ + Type: "ValidUnhealthyCondition", + Status: corev1.ConditionFalse, + // We expect the last transition for HealthyNode condition to wait 30 minutes + LastTransitionTime: metav1.Time{Time: time.Now()}, + }, corev1.NodeCondition{ + Type: "BadNode", + Status: corev1.ConditionFalse, + // We expect the last transition for HealthyNode condition to wait 30 minutes + LastTransitionTime: metav1.Time{Time: time.Now()}, + }) + ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node) + + fakeClock.Step(27 * time.Minute) + + result := ExpectObjectReconciled(ctx, env.Client, healthController, node) + fmt.Println(result.RequeueAfter.String()) + Expect(result.RequeueAfter).To(BeNumerically("~", time.Minute*3, time.Second)) + }) + It("should return the requeue interval for the time between now and when the nodeClaim termination time", func() { + node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{ + Type: "BadNode", + Status: corev1.ConditionFalse, + // We expect the last transition for HealthyNode condition to wait 30 minutes + LastTransitionTime: metav1.Time{Time: time.Now()}, + }) + ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node) + + fakeClock.Step(27 * time.Minute) + + result := ExpectObjectReconciled(ctx, env.Client, healthController, node) + Expect(result.RequeueAfter).To(BeNumerically("~", time.Minute*3, time.Second)) + }) + }) + + Context("Forceful termination", func() { + It("should ignore node disruption budgets", func() { + // Blocking disruption budgets + nodePool.Spec.Disruption = v1.Disruption{ + Budgets: []v1.Budget{ + { + Nodes: "0", + }, + }, + } + node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{ + Type: "BadNode", + Status: corev1.ConditionFalse, + LastTransitionTime: metav1.Time{Time: fakeClock.Now()}, + }) + fakeClock.Step(60 * time.Minute) + ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node) + // Determine to delete unhealthy nodes + ExpectObjectReconciled(ctx, env.Client, healthController, node) + + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + Expect(nodeClaim.DeletionTimestamp).ToNot(BeNil()) + }) + It("should ignore do-not-disrupt on a node", func() { + node.Annotations = map[string]string{v1.DoNotDisruptAnnotationKey: "true"} + node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{ + Type: "BadNode", + Status: corev1.ConditionFalse, + LastTransitionTime: metav1.Time{Time: fakeClock.Now()}, + }) + fakeClock.Step(60 * time.Minute) + ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node) + // Determine to delete unhealthy nodes + ExpectObjectReconciled(ctx, env.Client, healthController, node) + + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + Expect(nodeClaim.DeletionTimestamp).ToNot(BeNil()) + }) + }) + Context("Metrics", func() { + It("should fire a karpenter_nodeclaims_disrupted_total metric when unhealthy", func() { + node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{ + Type: "BadNode", + Status: corev1.ConditionFalse, + LastTransitionTime: metav1.Time{Time: fakeClock.Now()}, + }) + fakeClock.Step(60 * time.Minute) + ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node) + + ExpectObjectReconciled(ctx, env.Client, healthController, node) + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + Expect(nodeClaim.DeletionTimestamp).ToNot(BeNil()) + + ExpectMetricCounterValue(metrics.NodeClaimsDisruptedTotal, 1, map[string]string{ + metrics.ReasonLabel: string(cloudProvider.RepairPolicies()[0].ConditionType), + metrics.NodePoolLabel: nodePool.Name, + }) + }) + }) +}) diff --git a/pkg/controllers/nodeclaim/lifecycle/controller.go b/pkg/controllers/nodeclaim/lifecycle/controller.go index 7e33b23092..35b149f409 100644 --- a/pkg/controllers/nodeclaim/lifecycle/controller.go +++ b/pkg/controllers/nodeclaim/lifecycle/controller.go @@ -170,6 +170,9 @@ func (c *Controller) finalize(ctx context.Context, nodeClaim *v1.NodeClaim) (rec return reconcile.Result{}, nil } if err := c.ensureTerminationGracePeriodTerminationTimeAnnotation(ctx, nodeClaim); err != nil { + if errors.IsConflict(err) { + return reconcile.Result{Requeue: true}, nil + } return reconcile.Result{}, fmt.Errorf("adding nodeclaim terminationGracePeriod annotation, %w", err) } @@ -265,7 +268,10 @@ func (c *Controller) annotateTerminationGracePeriodTerminationTime(ctx context.C stored := nodeClaim.DeepCopy() nodeClaim.ObjectMeta.Annotations = lo.Assign(nodeClaim.ObjectMeta.Annotations, map[string]string{v1.NodeClaimTerminationTimestampAnnotationKey: terminationTime}) - if err := c.kubeClient.Patch(ctx, nodeClaim, client.MergeFrom(stored)); err != nil { + // We use client.MergeFromWithOptimisticLock because patching a terminationGracePeriod annotation + // can cause races with the health controller, as that controller sets the current time as the terminationGracePeriod annotation + // Here, We want to resolve any conflict and not overwrite the terminationGracePeriod annotation + if err := c.kubeClient.Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil { return client.IgnoreNotFound(err) } log.FromContext(ctx).WithValues(v1.NodeClaimTerminationTimestampAnnotationKey, terminationTime).Info("annotated nodeclaim") diff --git a/pkg/operator/options/options.go b/pkg/operator/options/options.go index 33abab5046..49fec1c314 100644 --- a/pkg/operator/options/options.go +++ b/pkg/operator/options/options.go @@ -42,6 +42,7 @@ type FeatureGates struct { inputStr string SpotToSpotConsolidation bool + NodeRepair bool } // Options contains all CLI flags / env vars for karpenter-core. It adheres to the options.Injectable interface. @@ -97,7 +98,7 @@ func (o *Options) AddFlags(fs *FlagSet) { fs.StringVar(&o.LogErrorOutputPaths, "log-error-output-paths", env.WithDefaultString("LOG_ERROR_OUTPUT_PATHS", "stderr"), "Optional comma separated paths for logging error output") fs.DurationVar(&o.BatchMaxDuration, "batch-max-duration", env.WithDefaultDuration("BATCH_MAX_DURATION", 10*time.Second), "The maximum length of a batch window. The longer this is, the more pods we can consider for provisioning at one time which usually results in fewer but larger nodes.") fs.DurationVar(&o.BatchIdleDuration, "batch-idle-duration", env.WithDefaultDuration("BATCH_IDLE_DURATION", time.Second), "The maximum amount of time with no new pending pods that if exceeded ends the current batching window. If pods arrive faster than this time, the batching window will be extended up to the maxDuration. If they arrive slower, the pods will be batched separately.") - fs.StringVar(&o.FeatureGates.inputStr, "feature-gates", env.WithDefaultString("FEATURE_GATES", "SpotToSpotConsolidation=false"), "Optional features can be enabled / disabled using feature gates. Current options are: SpotToSpotConsolidation") + fs.StringVar(&o.FeatureGates.inputStr, "feature-gates", env.WithDefaultString("FEATURE_GATES", "NodeRepair=false,SpotToSpotConsolidation=false"), "Optional features can be enabled / disabled using feature gates. Current options are: SpotToSpotConsolidation") } func (o *Options) Parse(fs *FlagSet, args ...string) error { @@ -132,6 +133,9 @@ func ParseFeatureGates(gateStr string) (FeatureGates, error) { if err := cliflag.NewMapStringBool(&gateMap).Set(gateStr); err != nil { return gates, err } + if val, ok := gateMap["NodeRepair"]; ok { + gates.NodeRepair = val + } if val, ok := gateMap["SpotToSpotConsolidation"]; ok { gates.SpotToSpotConsolidation = val } diff --git a/pkg/operator/options/suite_test.go b/pkg/operator/options/suite_test.go index 2b23eb32a4..e9630c8555 100644 --- a/pkg/operator/options/suite_test.go +++ b/pkg/operator/options/suite_test.go @@ -111,6 +111,7 @@ var _ = Describe("Options", func() { BatchMaxDuration: lo.ToPtr(10 * time.Second), BatchIdleDuration: lo.ToPtr(time.Second), FeatureGates: test.FeatureGates{ + NodeRepair: lo.ToPtr(false), SpotToSpotConsolidation: lo.ToPtr(false), }, })) @@ -136,7 +137,7 @@ var _ = Describe("Options", func() { "--log-error-output-paths", "/etc/k8s/testerror", "--batch-max-duration", "5s", "--batch-idle-duration", "5s", - "--feature-gates", "SpotToSpotConsolidation=true", + "--feature-gates", "SpotToSpotConsolidation=true,NodeRepair=true", ) Expect(err).To(BeNil()) expectOptionsMatch(opts, test.Options(test.OptionsFields{ @@ -156,6 +157,7 @@ var _ = Describe("Options", func() { BatchMaxDuration: lo.ToPtr(5 * time.Second), BatchIdleDuration: lo.ToPtr(5 * time.Second), FeatureGates: test.FeatureGates{ + NodeRepair: lo.ToPtr(true), SpotToSpotConsolidation: lo.ToPtr(true), }, })) @@ -177,7 +179,7 @@ var _ = Describe("Options", func() { os.Setenv("LOG_ERROR_OUTPUT_PATHS", "/etc/k8s/testerror") os.Setenv("BATCH_MAX_DURATION", "5s") os.Setenv("BATCH_IDLE_DURATION", "5s") - os.Setenv("FEATURE_GATES", "SpotToSpotConsolidation=true") + os.Setenv("FEATURE_GATES", "SpotToSpotConsolidation=true,NodeRepair=true") fs = &options.FlagSet{ FlagSet: flag.NewFlagSet("karpenter", flag.ContinueOnError), } @@ -201,6 +203,7 @@ var _ = Describe("Options", func() { BatchMaxDuration: lo.ToPtr(5 * time.Second), BatchIdleDuration: lo.ToPtr(5 * time.Second), FeatureGates: test.FeatureGates{ + NodeRepair: lo.ToPtr(true), SpotToSpotConsolidation: lo.ToPtr(true), }, })) @@ -217,7 +220,7 @@ var _ = Describe("Options", func() { os.Setenv("LOG_LEVEL", "debug") os.Setenv("BATCH_MAX_DURATION", "5s") os.Setenv("BATCH_IDLE_DURATION", "5s") - os.Setenv("FEATURE_GATES", "SpotToSpotConsolidation=true") + os.Setenv("FEATURE_GATES", "SpotToSpotConsolidation=true,NodeRepair=true") fs = &options.FlagSet{ FlagSet: flag.NewFlagSet("karpenter", flag.ContinueOnError), } @@ -246,6 +249,7 @@ var _ = Describe("Options", func() { BatchMaxDuration: lo.ToPtr(5 * time.Second), BatchIdleDuration: lo.ToPtr(5 * time.Second), FeatureGates: test.FeatureGates{ + NodeRepair: lo.ToPtr(true), SpotToSpotConsolidation: lo.ToPtr(true), }, })) diff --git a/pkg/test/options.go b/pkg/test/options.go index 8e19b30ffe..a6c1e2a1e9 100644 --- a/pkg/test/options.go +++ b/pkg/test/options.go @@ -47,6 +47,7 @@ type OptionsFields struct { } type FeatureGates struct { + NodeRepair *bool SpotToSpotConsolidation *bool } @@ -73,6 +74,7 @@ func Options(overrides ...OptionsFields) *options.Options { BatchMaxDuration: lo.FromPtrOr(opts.BatchMaxDuration, 10*time.Second), BatchIdleDuration: lo.FromPtrOr(opts.BatchIdleDuration, time.Second), FeatureGates: options.FeatureGates{ + NodeRepair: lo.FromPtrOr(opts.FeatureGates.NodeRepair, false), SpotToSpotConsolidation: lo.FromPtrOr(opts.FeatureGates.SpotToSpotConsolidation, false), }, }