Skip to content

Commit

Permalink
Merge pull request vmware-tanzu#607 from zhengxiexie/zhengxie/refacto…
Browse files Browse the repository at this point in the history
…r_gc_2

Refactor GC
  • Loading branch information
zhengxiexie authored Jun 24, 2024
2 parents 566bc19 + eb2323c commit 506e4fe
Show file tree
Hide file tree
Showing 18 changed files with 362 additions and 475 deletions.
6 changes: 6 additions & 0 deletions pkg/controllers/common/types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package common

import (
"context"
"time"

ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -41,3 +42,8 @@ const (
ReasonFailDelete = "FailDelete"
ReasonFailUpdate = "FailUpdate"
)

// GarbageCollector interface with collectGarbage method
type GarbageCollector interface {
CollectGarbage(ctx context.Context)
}
20 changes: 20 additions & 0 deletions pkg/controllers/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"strings"
"sync"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -141,3 +142,22 @@ func GetVirtualMachineNameForSubnetPort(subnetPort *v1alpha1.SubnetPort) (string
func NumReconcile() int {
return MaxConcurrentReconciles
}

func GenericGarbageCollector(cancel chan bool, timeout time.Duration, f func(ctx context.Context)) {
ctx := context.Background()
ticker := time.NewTicker(timeout)
defer ticker.Stop()

for {
select {
case <-cancel:
return
case <-ticker.C:
f(ctx)
}
}
}

func GcOnce(gc GarbageCollector, once *sync.Once) {
once.Do(func() { go GenericGarbageCollector(make(chan bool), servicecommon.GCInterval, gc.CollectGarbage) })
}
59 changes: 23 additions & 36 deletions pkg/controllers/ippool/ippool_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
"regexp"
"sync"
"time"

"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -117,7 +116,8 @@ func (r *IPPoolReconciler) setReadyStatusTrue(ctx *context.Context, ippool *v1al

func (r *IPPoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Use once.Do to ensure gc is called only once
once.Do(func() { go r.IPPoolGarbageCollector(make(chan bool), servicecommon.GCInterval) })
common.GcOnce(r, &once)

obj := &v1alpha2.IPPool{}
log.Info("reconciling ippool CR", "ippool", req.NamespacedName)
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerSyncTotal, MetricResType)
Expand Down Expand Up @@ -267,44 +267,31 @@ func (r *IPPoolReconciler) Start(mgr ctrl.Manager) error {
return nil
}

// IPPoolGarbageCollector collect ippool which has been removed from crd.
// cancel is used to break the loop during UT
func (r *IPPoolReconciler) IPPoolGarbageCollector(cancel chan bool, timeout time.Duration) {
ctx := context.Background()
// CollectGarbage implements the interface GarbageCollector method.
func (r *IPPoolReconciler) CollectGarbage(ctx context.Context) {
log.Info("ippool garbage collector started")
for {
select {
case <-cancel:
return
case <-time.After(timeout):
}
nsxIPPoolSet := r.Service.ListIPPoolID()
if len(nsxIPPoolSet) == 0 {
continue
}
ipPoolList := &v1alpha2.IPPoolList{}
err := r.Client.List(ctx, ipPoolList)
if err != nil {
log.Error(err, "failed to list ip pool CR")
continue
}
nsxIPPoolSet := r.Service.ListIPPoolID()
if len(nsxIPPoolSet) == 0 {
return
}

CRIPPoolSet := sets.NewString()
for _, ipp := range ipPoolList.Items {
CRIPPoolSet.Insert(string(ipp.UID))
}
ipPoolList := &v1alpha2.IPPoolList{}
if err := r.Client.List(ctx, ipPoolList); err != nil {
log.Error(err, "failed to list ippool CR")
return
}
CRIPPoolSet := sets.New[string]()
for _, ipa := range ipPoolList.Items {
CRIPPoolSet.Insert(string(ipa.UID))
}

log.V(2).Info("ippool garbage collector", "nsxIPPoolSet", nsxIPPoolSet, "CRIPPoolSet", CRIPPoolSet)
log.V(2).Info("ippool garbage collector", "nsxIPPoolSet", nsxIPPoolSet, "CRIPPoolSet", CRIPPoolSet)

for elem := range nsxIPPoolSet {
if CRIPPoolSet.Has(elem) {
continue
}
log.Info("GC collected ip pool CR", "UID", elem)
err = r.Service.DeleteIPPool(types.UID(elem))
if err != nil {
log.Error(err, "failed to delete ip pool CR", "UID", elem)
}
diffSet := nsxIPPoolSet.Difference(CRIPPoolSet)
for elem := range diffSet {
log.Info("GC collected ippool CR", "UID", elem)
if err := r.Service.DeleteIPPool(types.UID(elem)); err != nil {
log.Error(err, "failed to delete ippool CR", "UID", elem)
}
}
}
26 changes: 9 additions & 17 deletions pkg/controllers/ippool/ippool_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"context"
"errors"
"reflect"
"sync"
"testing"
"time"

"github.com/agiledragon/gomonkey/v2"
"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -114,6 +114,11 @@ func TestIPPoolReconciler_Reconcile(t *testing.T) {
ctx := context.Background()
req := controllerruntime.Request{NamespacedName: types.NamespacedName{Namespace: "dummy", Name: "dummy"}}

// common.GcOnce do nothing
var once sync.Once
pat := gomonkey.ApplyMethod(reflect.TypeOf(&once), "Do", func(_ *sync.Once, _ func()) {})
defer pat.Reset()

// not found
errNotFound := errors.New("not found")
k8sClient.EXPECT().Get(ctx, gomock.Any(), gomock.Any()).Return(errNotFound)
Expand Down Expand Up @@ -244,7 +249,6 @@ func TestReconciler_GarbageCollector(t *testing.T) {
patch.ApplyMethod(reflect.TypeOf(service), "DeleteIPPool", func(_ *ippool.IPPoolService, UID interface{}) error {
return nil
})
cancel := make(chan bool)
defer patch.Reset()
mockCtl := gomock.NewController(t)
k8sClient := mock_client.NewMockClient(mockCtl)
Expand All @@ -263,11 +267,7 @@ func TestReconciler_GarbageCollector(t *testing.T) {
a.Items[0].UID = "1234"
return nil
})
go func() {
time.Sleep(1 * time.Second)
cancel <- true
}()
r.IPPoolGarbageCollector(cancel, time.Second)
r.CollectGarbage(ctx)

// local store has same item as k8s cache
patch.Reset()
Expand All @@ -287,11 +287,7 @@ func TestReconciler_GarbageCollector(t *testing.T) {
a.Items[0].UID = "1234"
return nil
})
go func() {
time.Sleep(1 * time.Second)
cancel <- true
}()
r.IPPoolGarbageCollector(cancel, time.Second)
r.CollectGarbage(ctx)

// local store has no item
patch.Reset()
Expand All @@ -304,11 +300,7 @@ func TestReconciler_GarbageCollector(t *testing.T) {
return nil
})
k8sClient.EXPECT().List(ctx, policyList).Return(nil).Times(0)
go func() {
time.Sleep(1 * time.Second)
cancel <- true
}()
r.IPPoolGarbageCollector(cancel, time.Second)
r.CollectGarbage(ctx)
}

func TestReconciler_Start(t *testing.T) {
Expand Down
81 changes: 35 additions & 46 deletions pkg/controllers/networkinfo/networkinfo_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package networkinfo
import (
"context"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
apimachineryruntime "k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -44,7 +43,7 @@ type NetworkInfoReconciler struct {

func (r *NetworkInfoReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Use once.Do to ensure gc is called only once
once.Do(func() { go r.GarbageCollector(make(chan bool), commonservice.GCInterval) })
common.GcOnce(r, &once)

obj := &v1alpha1.NetworkInfo{}
log.Info("reconciling NetworkInfo CR", "NetworkInfo", req.NamespacedName)
Expand Down Expand Up @@ -212,59 +211,49 @@ func (r *NetworkInfoReconciler) Start(mgr ctrl.Manager) error {
return nil
}

// GarbageCollector logic for nsx-vpc is that:
// CollectGarbage logic for nsx-vpc is that:
// 1. list all current existing namespace in kubernetes
// 2. list all the nsx-vpc in vpcStore
// 3. loop all the nsx-vpc to get its namespace, check if the namespace still exist
// 4. if ns do not exist anymore, delete the nsx-vpc resource
func (r *NetworkInfoReconciler) GarbageCollector(cancel chan bool, timeout time.Duration) {
ctx := context.Background()
// it implements the interface GarbageCollector method.
func (r *NetworkInfoReconciler) CollectGarbage(ctx context.Context) {
log.Info("VPC garbage collector started")
for {
select {
case <-cancel:
return
case <-time.After(timeout):
}
// read all nsx-vpc from vpc store
nsxVPCList := r.Service.ListVPC()
if len(nsxVPCList) == 0 {
continue
}
// read all nsx-vpc from vpc store
nsxVPCList := r.Service.ListVPC()
if len(nsxVPCList) == 0 {
return
}

// read all namespaces from k8s
namespaces := &corev1.NamespaceList{}
err := r.Client.List(ctx, namespaces)
if err != nil {
log.Error(err, "failed to list k8s namespaces")
continue
}
// read all namespaces from k8s
namespaces := &corev1.NamespaceList{}
err := r.Client.List(ctx, namespaces)
if err != nil {
log.Error(err, "failed to list k8s namespaces")
return
}
nsSet := sets.NewString()
for _, ns := range namespaces.Items {
nsSet.Insert(ns.Name)
}

nsSet := sets.NewString()
for _, ns := range namespaces.Items {
nsSet.Insert(ns.Name)
for i := len(nsxVPCList) - 1; i >= 0; i-- {
nsxVPCNamespace := getNamespaceFromNSXVPC(&nsxVPCList[i])
if nsSet.Has(nsxVPCNamespace) {
continue
}
for _, elem := range nsxVPCList {
// for go lint Implicit memory aliasing in for loop
// this limitation is fixed after golang 1.22, should remove the temp var after upgrading to 1.22
tempElem := elem
nsxVPCNamespace := getNamespaceFromNSXVPC(&tempElem)
if nsSet.Has(nsxVPCNamespace) {
continue
}

log.V(1).Info("GC collected nsx VPC object", "ID", elem.Id, "Namespace", nsxVPCNamespace)
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteTotal, common.MetricResTypeNetworkInfo)
err = r.Service.DeleteVPC(*elem.Path)
if err != nil {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteFailTotal, common.MetricResTypeNetworkInfo)
} else {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteSuccessTotal, common.MetricResTypeNetworkInfo)
if err := r.Service.DeleteIPBlockInVPC(elem); err != nil {
log.Error(err, "failed to delete private ip blocks for VPC", "VPC", *elem.DisplayName)
}
log.Info("deleted private ip blocks for VPC", "VPC", *elem.DisplayName)
elem := nsxVPCList[i]
log.Info("GC collected nsx VPC object", "ID", elem.Id, "Namespace", nsxVPCNamespace)
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteTotal, common.MetricResTypeNetworkInfo)
err = r.Service.DeleteVPC(*elem.Path)
if err != nil {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteFailTotal, common.MetricResTypeNetworkInfo)
} else {
metrics.CounterInc(r.Service.NSXConfig, metrics.ControllerDeleteSuccessTotal, common.MetricResTypeNetworkInfo)
if err := r.Service.DeleteIPBlockInVPC(elem); err != nil {
log.Error(err, "failed to delete private ip blocks for VPC", "VPC", *elem.DisplayName)
}
log.Info("deleted private ip blocks for VPC", "VPC", *elem.DisplayName)
}
}
}
Loading

0 comments on commit 506e4fe

Please sign in to comment.