Skip to content

Commit

Permalink
fix the security group issue
Browse files Browse the repository at this point in the history
  • Loading branch information
wwvela committed Jan 7, 2025
1 parent 906b8d9 commit 6a12928
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 75 deletions.
2 changes: 1 addition & 1 deletion e2e2/test/cases/nvidia-inference/bert_inference_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestBertInference(t *testing.T) {
}
err := wait.For(
fwext.NewConditionExtension(cfg.Client().Resources()).JobSucceeded(job),
wait.WithTimeout(20*time.Minute),
wait.WithTimeout(60*time.Minute),
)
if err != nil {
t.Fatalf("[ERROR] BERT inference job did not succeed: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion e2e2/test/cases/nvidia-training/bert_training_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestBertTraining(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Name: "bert-training-launcher", Namespace: "default"},
}
err := wait.For(fwext.NewConditionExtension(cfg.Client().Resources()).JobSucceeded(job),
wait.WithTimeout(time.Minute*20))
wait.WithTimeout(time.Minute*60))
if err != nil {
t.Fatal(err)
}
Expand Down
116 changes: 116 additions & 0 deletions kubetest2/internal/deployers/eksapi/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,20 @@ func (m *nodeManager) deleteUnmanagedNodegroup() error {
}
return fmt.Errorf("failed to delete unmanaged nodegroup stack: %w", err)
}

efaSecurityGroupID, err := m.getEFASecurityGroupIDFromStack(stackName)
if err != nil {
return fmt.Errorf("failed to get EFASecurityGroup ID from stack: %w", err)
}

if efaSecurityGroupID != "" {
klog.Infof("clean up leakage ENIs in EFA Security Group")
err = m.cleanupLeakageENIs(efaSecurityGroupID)
if err != nil {
return fmt.Errorf("failed to wait for ASG deletion: %w", err)
}
}

klog.Infof("waiting for unmanaged nodegroup stack to be deleted: %s", stackName)
err = cloudformation.NewStackDeleteCompleteWaiter(m.clients.CFN()).
Wait(context.TODO(),
Expand All @@ -630,6 +644,108 @@ func (m *nodeManager) deleteUnmanagedNodegroup() error {
return nil
}

func (m *nodeManager) cleanupLeakageENIs(efaSecurityGroupID string) error {
klog.Infof("waiting for ASG in stack to be deleted: %s", m.resourceID)
err := m.waitForASGDeletion(m.resourceID)
if err != nil {
return fmt.Errorf("failed to wait for ASG deletion: %w", err)
}

klog.Infof("cleaning up ENIs attached to EFASecurityGroup: %s", efaSecurityGroupID)
err = m.cleanupEFASecurityGroupENIs(efaSecurityGroupID)
if err != nil {
return fmt.Errorf("failed to clean up EFASecurityGroup ENIs: %w", err)
}
return nil
}

func (m *nodeManager) waitForASGDeletion(asgName string) error {
ctx, cancel := context.WithTimeout(context.Background(), 20 * time.Minute)
defer cancel()
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return fmt.Errorf("timed out waiting for ASG %s deletion", asgName)
case <-ticker.C:
deleted, err := m.isASGDeleted(asgName)
if err != nil {
return fmt.Errorf("failed to check ASG deletion: %w", err)
} else if deleted {
return nil
}
}
}
}

func (m *nodeManager) isASGDeleted(asgName string) (bool, error) {
asgOutput, err := m.clients.ASG().DescribeAutoScalingGroups(context.TODO(), &autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: []string{asgName},
})
if err != nil {
return false, fmt.Errorf("failed to describe ASG: %w", err)
} else if len(asgOutput.AutoScalingGroups) == 0 {
return true, nil
}
return false, nil
}

func (m *nodeManager) cleanupEFASecurityGroupENIs(efaSecurityGroupID string) error {
enis, err := m.getSecurityGroupNetworkInterfaceIds(efaSecurityGroupID)
if err != nil {
return fmt.Errorf("failed to describe ENIs: %w", err)
}

for _, eni := range enis {
klog.Infof("deleting leaked ENI: %s", eni)
_, err := m.clients.EC2().DeleteNetworkInterface(context.TODO(), &ec2.DeleteNetworkInterfaceInput{
NetworkInterfaceId: aws.String(eni),
})
if err != nil {
return fmt.Errorf("failed to delete leaked ENI: %w", err)
}
}
klog.Infof("deleted %d leaked ENI(s) attached to EFA security group!", len(enis))
return nil
}

func (m *nodeManager) getSecurityGroupNetworkInterfaceIds(efaSecurityGroupID string) ([]string, error) {
output, err := m.clients.EC2().DescribeNetworkInterfaces(context.TODO(), &ec2.DescribeNetworkInterfacesInput{
Filters: []ec2types.Filter{
{
Name: aws.String("group-id"),
Values: []string{efaSecurityGroupID},
},
},
})
if err != nil {
return nil, fmt.Errorf("failed to describe ENIs: %w", err)
}

var enis []string
for _, eni := range output.NetworkInterfaces {
enis = append(enis, *eni.NetworkInterfaceId)
}
return enis, nil
}

func (m *nodeManager) getEFASecurityGroupIDFromStack(stackName string) (string, error) {
describeInput := cloudformation.DescribeStackResourcesInput{
StackName: aws.String(stackName),
}
output, err := m.clients.CFN().DescribeStackResources(context.TODO(), &describeInput)
if err != nil {
return "", fmt.Errorf("failed to describe stack resources: %w", err)
}
for _, resource := range output.StackResources {
if *resource.LogicalResourceId == "EFASecurityGroup" {
return *resource.PhysicalResourceId, nil
}
}
return "", nil
}

func (m *nodeManager) getUnmanagedNodegroupStackName() string {
return fmt.Sprintf("%s-unmanaged-nodegroup", m.resourceID)
}
Expand Down
Loading

0 comments on commit 6a12928

Please sign in to comment.