Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func main() {
op.InstanceTypesProvider,
op.CapacityReservationProvider,
op.AMIResolver,
op.InstanceStatusProvider,
)...).
Start(ctx)
}
1 change: 1 addition & 0 deletions kwok/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func main() {
op.InstanceTypesProvider,
op.CapacityReservationProvider,
op.AMIResolver,
op.InstanceStatusProvider,
)...).
Start(ctx)
wg.Wait()
Expand Down
5 changes: 5 additions & 0 deletions kwok/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"github.com/aws/karpenter-provider-aws/pkg/providers/capacityreservation"
"github.com/aws/karpenter-provider-aws/pkg/providers/instance"
"github.com/aws/karpenter-provider-aws/pkg/providers/instanceprofile"
"github.com/aws/karpenter-provider-aws/pkg/providers/instancestatus"
"github.com/aws/karpenter-provider-aws/pkg/providers/instancetype"
"github.com/aws/karpenter-provider-aws/pkg/providers/launchtemplate"
"github.com/aws/karpenter-provider-aws/pkg/providers/pricing"
Expand Down Expand Up @@ -91,6 +92,7 @@ type Operator struct {
VersionProvider *version.DefaultProvider
InstanceTypesProvider *instancetype.DefaultProvider
InstanceProvider instance.Provider
InstanceStatusProvider *instancestatus.DefaultProvider
SSMProvider ssmp.Provider
CapacityReservationProvider capacityreservation.Provider
EC2API *kwokec2.Client
Expand Down Expand Up @@ -192,6 +194,8 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval),
)

instanceStatusProvider := instancestatus.NewDefaultProvider(ec2api, operator.Clock)

// Setup field indexers on instanceID -- specifically for the interruption controller
if options.FromContext(ctx).InterruptionQueue != "" {
SetupIndexers(ctx, operator.Manager)
Expand All @@ -213,6 +217,7 @@ func NewOperator(ctx context.Context, operator *operator.Operator) (context.Cont
PricingProvider: pricingProvider,
InstanceTypesProvider: instanceTypeProvider,
InstanceProvider: instanceProvider,
InstanceStatusProvider: instanceStatusProvider,
SSMProvider: ssmProvider,
CapacityReservationProvider: capacityReservationProvider,
EC2API: ec2api,
Expand Down
2 changes: 2 additions & 0 deletions pkg/aws/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type EC2API interface {
DescribeInstanceTypes(context.Context, *ec2.DescribeInstanceTypesInput, ...func(*ec2.Options)) (*ec2.DescribeInstanceTypesOutput, error)
DescribeInstanceTypeOfferings(context.Context, *ec2.DescribeInstanceTypeOfferingsInput, ...func(*ec2.Options)) (*ec2.DescribeInstanceTypeOfferingsOutput, error)
DescribeSpotPriceHistory(context.Context, *ec2.DescribeSpotPriceHistoryInput, ...func(*ec2.Options)) (*ec2.DescribeSpotPriceHistoryOutput, error)
DescribeInstanceStatus(context.Context, *ec2.DescribeInstanceStatusInput, ...func(*ec2.Options)) (*ec2.DescribeInstanceStatusOutput, error)
CreateFleet(context.Context, *ec2.CreateFleetInput, ...func(*ec2.Options)) (*ec2.CreateFleetOutput, error)
TerminateInstances(context.Context, *ec2.TerminateInstancesInput, ...func(*ec2.Options)) (*ec2.TerminateInstancesOutput, error)
DescribeInstances(context.Context, *ec2.DescribeInstancesInput, ...func(*ec2.Options)) (*ec2.DescribeInstancesOutput, error)
Expand Down Expand Up @@ -70,6 +71,7 @@ type SQSAPI interface {
ReceiveMessage(context.Context, *sqs.ReceiveMessageInput, ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error)
DeleteMessage(context.Context, *sqs.DeleteMessageInput, ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error)
SendMessage(context.Context, *sqs.SendMessageInput, ...func(*sqs.Options)) (*sqs.SendMessageOutput, error)
GetQueueUrl(context.Context, *sqs.GetQueueUrlInput, ...func(*sqs.Options)) (*sqs.GetQueueUrlOutput, error)
}

type TimestreamWriteAPI interface {
Expand Down
7 changes: 6 additions & 1 deletion pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
ssminvalidation "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/ssm/invalidation"
controllersversion "github.com/aws/karpenter-provider-aws/pkg/controllers/providers/version"
capacityreservationprovider "github.com/aws/karpenter-provider-aws/pkg/providers/capacityreservation"
"github.com/aws/karpenter-provider-aws/pkg/providers/instancestatus"
"github.com/aws/karpenter-provider-aws/pkg/providers/launchtemplate"
"github.com/aws/karpenter-provider-aws/pkg/providers/version"

Expand Down Expand Up @@ -87,6 +88,7 @@ func NewControllers(
instanceTypeProvider *instancetype.DefaultProvider,
capacityReservationProvider capacityreservationprovider.Provider,
amiResolver amifamily.Resolver,
instanceStatusProvider instancestatus.Provider,
) []controller.Controller {
controllers := []controller.Controller{
nodeclasshash.NewController(kubeClient),
Expand All @@ -111,7 +113,10 @@ func NewControllers(
if options.FromContext(ctx).InterruptionQueue != "" {
sqsAPI := servicesqs.NewFromConfig(cfg)
prov, _ := sqs.NewSQSProvider(ctx, sqsAPI)
controllers = append(controllers, interruption.NewController(kubeClient, cloudProvider, clk, recorder, prov, sqsAPI, unavailableOfferings))
controllers = append(controllers, interruption.NewController(kubeClient, cloudProvider, clk, recorder, prov, sqsAPI, unavailableOfferings, instanceStatusProvider))
} else {
// if no queue is configured, start the interruption controller with only instance status monitoring
controllers = append(controllers, interruption.NewController(kubeClient, cloudProvider, clk, recorder, nil, nil, unavailableOfferings, instanceStatusProvider))
}
return controllers
}
139 changes: 110 additions & 29 deletions pkg/controllers/interruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"sigs.k8s.io/karpenter/pkg/metrics"

ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
sqsapi "github.com/aws/aws-sdk-go-v2/service/sqs"
sqstypes "github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/awslabs/operatorpkg/reconciler"
"github.com/awslabs/operatorpkg/singleton"
Expand All @@ -43,9 +42,15 @@ import (

"sigs.k8s.io/karpenter/pkg/events"

lop "github.com/samber/lo/parallel"

sdk "github.com/aws/karpenter-provider-aws/pkg/aws"
"github.com/aws/karpenter-provider-aws/pkg/cache"
interruptionevents "github.com/aws/karpenter-provider-aws/pkg/controllers/interruption/events"
"github.com/aws/karpenter-provider-aws/pkg/controllers/interruption/messages"
"github.com/aws/karpenter-provider-aws/pkg/controllers/interruption/messages/instancestatusfailure"
"github.com/aws/karpenter-provider-aws/pkg/operator/options"
"github.com/aws/karpenter-provider-aws/pkg/providers/instancestatus"
"github.com/aws/karpenter-provider-aws/pkg/providers/sqs"
)

Expand All @@ -56,19 +61,31 @@ const (
NoAction Action = "NoAction"
)

var (
// InstatusStatusInterval is used to rate limit calls to the EC2 DescribeInstanceStatus API
// since the Interruption controller runs in a hot loop with an SQS long poller sub-reconciler.
// Without this rate limit, if there are a lot of messages in the SQS queue, DescribeInstanceStatus
// could be called continuously since the long polling receives messages in small batches.
InstanceStatusInterval = 30 * time.Second
)

// Controller is an AWS interruption controller.
// It continually polls an SQS queue for events from aws.ec2 and aws.health that
// trigger node health events or node spot interruption/rebalance events.
type Controller struct {
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
clk clock.Clock
recorder events.Recorder
sqsProvider sqs.Provider
sqsAPI *sqsapi.Client
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
clk clock.Clock
recorder events.Recorder
// sqsProvider can be nil when a queue is not configured by the user
sqsProvider sqs.Provider
// sqsAPI can be nil when a queue is not configured by the user
sqsAPI sdk.SQSAPI
unavailableOfferingsCache *cache.UnavailableOfferings
instanceStatusProvider instancestatus.Provider
parser *EventParser
cm *pretty.ChangeMonitor
lastInstanceStatusRun time.Time
}

func NewController(
Expand All @@ -77,8 +94,9 @@ func NewController(
clk clock.Clock,
recorder events.Recorder,
sqsProvider sqs.Provider,
sqsAPI *sqsapi.Client,
sqsAPI sdk.SQSAPI,
unavailableOfferingsCache *cache.UnavailableOfferings,
instanceStatusProvider instancestatus.Provider,
) *Controller {
return &Controller{
kubeClient: kubeClient,
Expand All @@ -88,31 +106,76 @@ func NewController(
sqsProvider: sqsProvider,
sqsAPI: sqsAPI,
unavailableOfferingsCache: unavailableOfferingsCache,
instanceStatusProvider: instanceStatusProvider,
parser: NewEventParser(DefaultParsers...),
cm: pretty.NewChangeMonitor(),
}
}

func (c *Controller) Reconcile(ctx context.Context) (reconciler.Result, error) {
ctx = injection.WithControllerName(ctx, "interruption")
if c.sqsProvider == nil {
prov, err := sqs.NewSQSProvider(ctx, c.sqsAPI)
if err != nil {
log.FromContext(ctx).Error(err, "failed to create valid sqs provider")
return reconciler.Result{}, fmt.Errorf("creating sqs provider, %w", err)

reconcilers := []func(context.Context) error{
c.reconcileFromSQS,
c.reconcileInstanceStatus,
}

errs := make([]error, len(reconcilers))
lop.ForEach(reconcilers, func(r func(context.Context) error, i int) {
errs[i] = r(ctx)
})

if err := multierr.Combine(errs...); err != nil {
return reconciler.Result{}, fmt.Errorf("reconciling interruptions, %w", err)
}

if options.FromContext(ctx).InterruptionQueue != "" {
return reconciler.Result{RequeueAfter: singleton.RequeueImmediately}, nil
}
return reconciler.Result{RequeueAfter: InstanceStatusInterval}, nil
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named("interruption").
WatchesRawSource(singleton.Source()).
Complete(singleton.AsReconciler(c))
}

func (c *Controller) reconcileInstanceStatus(ctx context.Context) error {
// Pulling instance status more often can result in rate limiting when many SQS messages
// are received since the interruption controller runs in a hot loop
if c.clk.Since(c.lastInstanceStatusRun) < InstanceStatusInterval {
return nil
}
instanceStatuses, err := c.instanceStatusProvider.List(ctx)
if err != nil {
return fmt.Errorf("getting instance statuses %w", err)
}

errs := make([]error, len(instanceStatuses))
workqueue.ParallelizeUntil(ctx, 10, len(instanceStatuses), func(i int) {
if err := c.handleMessage(ctx, instancestatusfailure.Message(instanceStatuses[i])); err != nil {
errs[i] = fmt.Errorf("handling instance status check message, %w", err)
}
c.sqsProvider = prov
})
if err = multierr.Combine(errs...); err != nil {
return err
}
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("queue", c.sqsProvider.Name()))
if c.cm.HasChanged(c.sqsProvider.Name(), nil) {
log.FromContext(ctx).V(1).Info("watching interruption queue")
return nil
}

func (c *Controller) reconcileFromSQS(ctx context.Context) error {
if options.FromContext(ctx).InterruptionQueue == "" {
return nil
}
sqsMessages, err := c.sqsProvider.GetSQSMessages(ctx)
sqsMessages, err := c.sqsMessages(ctx)
if err != nil {
return reconciler.Result{}, fmt.Errorf("getting messages from queue, %w", err)
return err
}

if len(sqsMessages) == 0 {
return reconciler.Result{RequeueAfter: singleton.RequeueImmediately}, nil
return nil
}

errs := make([]error, len(sqsMessages))
Expand All @@ -131,16 +194,34 @@ func (c *Controller) Reconcile(ctx context.Context) (reconciler.Result, error) {
errs[i] = c.deleteMessage(ctx, sqsMessages[i])
})
if err = multierr.Combine(errs...); err != nil {
return reconciler.Result{}, err
return err
}
return reconciler.Result{RequeueAfter: singleton.RequeueImmediately}, nil
return nil
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named("interruption").
WatchesRawSource(singleton.Source()).
Complete(singleton.AsReconciler(c))
func (c *Controller) sqsMessages(ctx context.Context) ([]*sqstypes.Message, error) {
if c.sqsAPI == nil {
return nil, nil
}
// If the provider was unable to instantiate, keep trying.
// This would most likely be due to a permissions issue that can be fixed at runtime.
if c.sqsProvider == nil {
prov, err := sqs.NewSQSProvider(ctx, c.sqsAPI)
if err != nil {
log.FromContext(ctx).Error(err, "failed to create valid sqs provider")
return nil, fmt.Errorf("creating sqs provider, %w", err)
}
c.sqsProvider = prov
}
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("queue", c.sqsProvider.Name()))
if c.cm.HasChanged(c.sqsProvider.Name(), nil) {
log.FromContext(ctx).V(1).Info("watching interruption queue")
}
sqsMessages, err := c.sqsProvider.GetSQSMessages(ctx)
if err != nil {
return nil, fmt.Errorf("getting messages from queue, %w", err)
}
return sqsMessages, nil
}

// parseMessage parses the passed SQS message into an internal Message interface
Expand Down Expand Up @@ -256,7 +337,7 @@ func (c *Controller) notifyForMessage(msg messages.Message, nodeClaim *karpv1.No
case messages.RebalanceRecommendationKind:
c.recorder.Publish(interruptionevents.RebalanceRecommendation(n, nodeClaim)...)

case messages.ScheduledChangeKind:
case messages.ScheduledChangeKind, messages.InstanceStatusFailure:
c.recorder.Publish(interruptionevents.Unhealthy(n, nodeClaim)...)

case messages.SpotInterruptionKind:
Expand All @@ -274,7 +355,7 @@ func (c *Controller) notifyForMessage(msg messages.Message, nodeClaim *karpv1.No

func actionForMessage(msg messages.Message) Action {
switch msg.Kind() {
case messages.ScheduledChangeKind, messages.SpotInterruptionKind, messages.InstanceStoppedKind, messages.InstanceTerminatedKind:
case messages.ScheduledChangeKind, messages.SpotInterruptionKind, messages.InstanceStoppedKind, messages.InstanceTerminatedKind, messages.InstanceStatusFailure:
return CordonAndDrain
default:
return NoAction
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package instancestatusfailure

import (
"time"

"github.com/aws/karpenter-provider-aws/pkg/controllers/interruption/messages"
"github.com/aws/karpenter-provider-aws/pkg/providers/instancestatus"
)

// Message contains the Instance Status from EC2.DescribeInstanceStatus
// This is not vended via EventBridge but is handled in a similar manner
// as other EventBridge messages.
type Message instancestatus.HealthStatus

func (m Message) EC2InstanceIDs() []string {
return []string{m.InstanceID}
}

func (Message) Kind() messages.Kind {
return messages.InstanceStatusFailure
}

func (m Message) StartTime() time.Time {
return m.ImpairedSince
}
1 change: 1 addition & 0 deletions pkg/controllers/interruption/messages/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
SpotInterruptionKind Kind = "spot_interrupted"
InstanceStoppedKind Kind = "instance_stopped"
InstanceTerminatedKind Kind = "instance_terminated"
InstanceStatusFailure Kind = "instance_status_failure"
NoOpKind Kind = "no_op"
)

Expand Down
Loading