Skip to content

Commit

Permalink
Add parallelism to janitor, stack status filter
Browse files Browse the repository at this point in the history
  • Loading branch information
cartermckinnon committed Jan 9, 2025
1 parent 83a0c80 commit 5a9f532
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 31 deletions.
6 changes: 5 additions & 1 deletion cmd/kubetest2-eksapi-janitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@ import (
func main() {
var maxResourceAge time.Duration
flag.DurationVar(&maxResourceAge, "max-resource-age", time.Hour*3, "Maximum resource age")
var workers int
flag.IntVar(&workers, "workers", 1, "number of workers to processes resources in parallel")
var stackStatus string
flag.StringVar(&stackStatus, "stack-status", "", "only process stacks with a specific status")
var emitMetrics bool
flag.BoolVar(&emitMetrics, "emit-metrics", false, "Send metrics to CloudWatch")
flag.Parse()
j := eksapi.NewJanitor(maxResourceAge, emitMetrics)
j := eksapi.NewJanitor(maxResourceAge, emitMetrics, workers, stackStatus)
if err := j.Sweep(context.Background()); err != nil {
klog.Fatalf("failed to sweep resources: %v", err)
}
Expand Down
5 changes: 5 additions & 0 deletions internal/deployers/eksapi/infra.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,11 @@ func (m *InfrastructureManager) deleteInfrastructureStack() error {
// because this will block node role deletion (and deletion of the infrastructure stack).
// For example, when --auto-mode is used, an instance profile will be created for us and won't be deleted automatically with the cluster.
func (m *InfrastructureManager) deleteLeakedInstanceProfiles(infra *Infrastructure) error {
if infra.nodeRoleName == "" {
// if the infra stack failed to create, it could end up in a weird state with no node role
// we know there aren't any instance profiles in that case, so all good
return nil
}
out, err := m.clients.IAM().ListInstanceProfilesForRole(context.TODO(), &iam.ListInstanceProfilesForRoleInput{
RoleName: aws.String(infra.nodeRoleName),
})
Expand Down
101 changes: 71 additions & 30 deletions internal/deployers/eksapi/janitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/aws/aws-k8s-tester/internal/awssdk"
Expand All @@ -16,16 +17,21 @@ import (
"k8s.io/klog/v2"
)

func NewJanitor(maxResourceAge time.Duration, emitMetrics bool) *janitor {
func NewJanitor(maxResourceAge time.Duration, emitMetrics bool, workers int, stackStatus string) *janitor {
awsConfig := awssdk.NewConfig()
var metricRegistry metrics.MetricRegistry
if emitMetrics {
metricRegistry = metrics.NewCloudWatchRegistry(cloudwatch.NewFromConfig(awsConfig))
} else {
metricRegistry = metrics.NewNoopMetricRegistry()
}
if workers <= 0 {
workers = 1
}
return &janitor{
maxResourceAge: maxResourceAge,
workers: workers,
stackStatus: stackStatus,
awsConfig: awsConfig,
cfnClient: cloudformation.NewFromConfig(awsConfig),
metrics: metricRegistry,
Expand All @@ -34,6 +40,8 @@ func NewJanitor(maxResourceAge time.Duration, emitMetrics bool) *janitor {

type janitor struct {
maxResourceAge time.Duration
workers int
stackStatus string

awsConfig aws.Config
cfnClient *cloudformation.Client
Expand All @@ -43,40 +51,73 @@ type janitor struct {
func (j *janitor) Sweep(ctx context.Context) error {
awsConfig := awssdk.NewConfig()
cfnClient := cloudformation.NewFromConfig(awsConfig)
stacks := cloudformation.NewDescribeStacksPaginator(cfnClient, &cloudformation.DescribeStacksInput{})
stacks, err := j.getStacks(ctx, cfnClient)
if err != nil {
return fmt.Errorf("failed to get stacks: %v", err)
}
var wg sync.WaitGroup
stackQueue := make(chan cloudformationtypes.Stack, len(stacks))
errChan := make(chan error, len(stacks))
for i := 1; i <= j.workers; i++ {
wg.Add(1)
go j.sweepWorker(&wg, stackQueue, errChan)
}

for _, stack := range stacks {
stackQueue <- stack
}
close(stackQueue)

wg.Wait()
close(errChan)
var errs []error
for stacks.HasMorePages() {
page, err := stacks.NextPage(ctx)
for err := range errChan {
errs = append(errs, err)
}
return errors.Join(errs...)
}

func (j *janitor) getStacks(ctx context.Context, cfnClient *cloudformation.Client) ([]cloudformationtypes.Stack, error) {
var stacks []cloudformationtypes.Stack
stackPaginator := cloudformation.NewDescribeStacksPaginator(cfnClient, &cloudformation.DescribeStacksInput{})
for stackPaginator.HasMorePages() {
page, err := stackPaginator.NextPage(ctx)
if err != nil {
return err
}
for _, stack := range page.Stacks {
resourceID := *stack.StackName
if !strings.HasPrefix(resourceID, ResourcePrefix) {
continue
}
if stack.StackStatus == "DELETE_COMPLETE" {
continue
}
resourceAge := time.Since(*stack.CreationTime)
if resourceAge < j.maxResourceAge {
klog.Infof("skipping resources (%v old): %s", resourceAge, resourceID)
continue
}
clients := j.awsClientsForStack(stack)
infraManager := NewInfrastructureManager(clients, resourceID, j.metrics)
clusterManager := NewClusterManager(clients, resourceID)
nodeManager := NewNodeManager(clients, resourceID)
klog.Infof("deleting resources (%v old): %s", resourceAge, resourceID)
if err := deleteResources(infraManager, clusterManager, nodeManager /* TODO: pass a k8sClient */, nil, nil); err != nil {
errs = append(errs, fmt.Errorf("failed to delete resources: %s: %v", resourceID, err))
}
return nil, err
}
stacks = append(stacks, page.Stacks...)
}
if len(errs) > 0 {
return errors.Join(errs...)
return stacks, nil
}

func (j *janitor) sweepWorker(wg *sync.WaitGroup, stackQueue <-chan cloudformationtypes.Stack, errChan chan<- error) {
defer wg.Done()
for stack := range stackQueue {
resourceID := *stack.StackName
if !strings.HasPrefix(resourceID, ResourcePrefix) {
continue
}
if stack.StackStatus == "DELETE_COMPLETE" {
continue
}
if j.stackStatus != "" && j.stackStatus != string(stack.StackStatus) {
klog.Infof("skipping resources (status: %v): %s", stack.StackStatus, resourceID)
continue
}
resourceAge := time.Since(*stack.CreationTime)
if resourceAge < j.maxResourceAge {
klog.Infof("skipping resources (%v old): %s", resourceAge, resourceID)
continue
}
clients := j.awsClientsForStack(stack)
infraManager := NewInfrastructureManager(clients, resourceID, j.metrics)
clusterManager := NewClusterManager(clients, resourceID)
nodeManager := NewNodeManager(clients, resourceID)
klog.Infof("deleting resources (%v old): %s", resourceAge, resourceID)
if err := deleteResources(infraManager, clusterManager, nodeManager /* TODO: pass a k8sClient */, nil, nil); err != nil {
errChan <- fmt.Errorf("failed to delete resources: %s: %v", resourceID, err)
}
}
return nil
}

func (j *janitor) awsClientsForStack(stack cloudformationtypes.Stack) *awsClients {
Expand Down

0 comments on commit 5a9f532

Please sign in to comment.