From f3c7712143a7f738c28219c95136e0f6ee7b99a4 Mon Sep 17 00:00:00 2001 From: renxiangyu_yewu Date: Sat, 8 Feb 2025 11:05:10 +0800 Subject: [PATCH] fix: get calico blockaffinity by etcd Signed-off-by: renxiangyu_yewu --- .../calicoippool/calicoippool_controller.go | 4 +- .../blockaffinitysyncer.go | 5 +- .../blockeventhandler.go | 32 ++- .../nodecidr/adaper/calico_etcd.go | 40 +++- .../nodecidr/adaper/calico_etcd_test.go | 226 ++++++++++++++++++ .../nodecidr/nodecidr_controller.go | 1 + 6 files changed, 287 insertions(+), 21 deletions(-) rename pkg/clusterlink/controllers/nodecidr/adaper/{blockwatchsyncer => }/blockaffinitysyncer.go (74%) rename pkg/clusterlink/controllers/nodecidr/adaper/{blockwatchsyncer => }/blockeventhandler.go (59%) create mode 100644 pkg/clusterlink/controllers/nodecidr/adaper/calico_etcd_test.go diff --git a/pkg/clusterlink/controllers/calicoippool/calicoippool_controller.go b/pkg/clusterlink/controllers/calicoippool/calicoippool_controller.go index 8a11856f4..18757931c 100644 --- a/pkg/clusterlink/controllers/calicoippool/calicoippool_controller.go +++ b/pkg/clusterlink/controllers/calicoippool/calicoippool_controller.go @@ -465,14 +465,14 @@ func syncIPPool(currentClusterName string, globalExtIPPoolSet ExternalIPPoolSet, } } - klog.V(4).Infof("cluster %s has %d ippools to delete, they are", currentClusterName, len(deleteIPPool), deleteIPPool) + klog.V(4).Infof("cluster %s has %d ippools to delete, they are %v", currentClusterName, len(deleteIPPool), deleteIPPool) err = client.DeleteIPPool(deleteIPPool) if err != nil { klog.Errorf("cluster %s delete ippool err: %v", currentClusterName, err) return err } - klog.V(4).Infof("cluster %s has %d ippools to modify, they are", currentClusterName, len(modifyIPPool), modifyIPPool) + klog.V(4).Infof("cluster %s has %d ippools to modify, they are %v", currentClusterName, len(modifyIPPool), modifyIPPool) err = client.CreateOrUpdateCalicoIPPool(modifyIPPool) if err != nil { klog.Errorf("cluster %s modify ippool err: %v", currentClusterName, err) diff --git a/pkg/clusterlink/controllers/nodecidr/adaper/blockwatchsyncer/blockaffinitysyncer.go b/pkg/clusterlink/controllers/nodecidr/adaper/blockaffinitysyncer.go similarity index 74% rename from pkg/clusterlink/controllers/nodecidr/adaper/blockwatchsyncer/blockaffinitysyncer.go rename to pkg/clusterlink/controllers/nodecidr/adaper/blockaffinitysyncer.go index c33aaf081..4600ad4eb 100644 --- a/pkg/clusterlink/controllers/nodecidr/adaper/blockwatchsyncer/blockaffinitysyncer.go +++ b/pkg/clusterlink/controllers/nodecidr/adaper/blockaffinitysyncer.go @@ -1,7 +1,6 @@ -package blockwatchsyncer +package adaper import ( - apiv3 "github.com/projectcalico/api/pkg/apis/projectcalico/v3" "github.com/projectcalico/calico/libcalico-go/lib/backend/api" "github.com/projectcalico/calico/libcalico-go/lib/backend/model" "github.com/projectcalico/calico/libcalico-go/lib/backend/watchersyncer" @@ -11,7 +10,7 @@ import ( func NewBlockWatchSyncer(client api.Client, callbacks api.SyncerCallbacks) api.Syncer { resourceTypes := []watchersyncer.ResourceType{ { - ListInterface: model.ResourceListOptions{Kind: apiv3.KindBlockAffinity}, + ListInterface: model.BlockAffinityListOptions{}, }, } diff --git a/pkg/clusterlink/controllers/nodecidr/adaper/blockwatchsyncer/blockeventhandler.go b/pkg/clusterlink/controllers/nodecidr/adaper/blockeventhandler.go similarity index 59% rename from pkg/clusterlink/controllers/nodecidr/adaper/blockwatchsyncer/blockeventhandler.go rename to pkg/clusterlink/controllers/nodecidr/adaper/blockeventhandler.go index a1288b300..e7a30f4cd 100644 --- a/pkg/clusterlink/controllers/nodecidr/adaper/blockwatchsyncer/blockeventhandler.go +++ b/pkg/clusterlink/controllers/nodecidr/adaper/blockeventhandler.go @@ -1,12 +1,15 @@ -package blockwatchsyncer +package adaper import ( "time" "github.com/projectcalico/calico/libcalico-go/lib/backend/api" + "github.com/projectcalico/calico/libcalico-go/lib/backend/model" + log "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" + clusterlister "github.com/kosmos.io/kosmos/pkg/generated/listers/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/utils/lifted" ) @@ -17,17 +20,21 @@ type BlockEventHandler struct { // Channel for getting updates and status updates from syncer. syncerC chan interface{} - processor lifted.AsyncWorker - // Channel to indicate node status reporter routine is not needed anymore. + processor lifted.AsyncWorker + clusterNodeLister clusterlister.ClusterNodeLister + // Channel that notifies the Run method to exit done chan struct{} // Flag to show we are in-sync. inSync bool } -func NewBlockEventHandler(processor lifted.AsyncWorker) *BlockEventHandler { +func NewBlockEventHandler(processor lifted.AsyncWorker, clusterNodeLister clusterlister.ClusterNodeLister) *BlockEventHandler { return &BlockEventHandler{ - processor: processor, + processor: processor, + clusterNodeLister: clusterNodeLister, + syncerC: make(chan interface{}, 100), + done: make(chan struct{}), } } @@ -71,9 +78,20 @@ func (b *BlockEventHandler) OnUpdates(updates []api.Update) { b.syncerC <- updates } -// todo put etcd's event info AsyncWorker's queue -func (b *BlockEventHandler) onupdate(_ []api.Update) { +func (b *BlockEventHandler) onupdate(event []api.Update) { + klog.V(7).Info("update event") + for _, update := range event { + blockAffinityKey, ok := update.Key.(model.BlockAffinityKey) + if !ok { + log.Errorf("Failed to cast object to blockAffinityKey: %+v", update.Key) + return + } + + node := blockAffinityKey.Host + requeue(node, b.clusterNodeLister, b.processor) + klog.V(4).Infof("Processing blockAffinityKey update: %+v", update.Key) + } } func (b *BlockEventHandler) WaitForCacheSync(stopCh <-chan struct{}) bool { diff --git a/pkg/clusterlink/controllers/nodecidr/adaper/calico_etcd.go b/pkg/clusterlink/controllers/nodecidr/adaper/calico_etcd.go index 7dbf10e19..1c6831e6c 100644 --- a/pkg/clusterlink/controllers/nodecidr/adaper/calico_etcd.go +++ b/pkg/clusterlink/controllers/nodecidr/adaper/calico_etcd.go @@ -1,10 +1,14 @@ package adaper import ( + "context" + "fmt" + "strings" + "github.com/projectcalico/calico/libcalico-go/lib/backend/api" + "github.com/projectcalico/calico/libcalico-go/lib/backend/model" "k8s.io/klog/v2" - "github.com/kosmos.io/kosmos/pkg/clusterlink/controllers/nodecidr/adaper/blockwatchsyncer" clusterlister "github.com/kosmos.io/kosmos/pkg/generated/listers/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/utils/lifted" ) @@ -29,10 +33,10 @@ func NewCalicoETCDAdapter(etcdClient api.Client, } func (c *CalicoETCDAdapter) Start(stopCh <-chan struct{}) error { - blockEventHandler := blockwatchsyncer.NewBlockEventHandler(c.processor) - c.watchSyncer = blockwatchsyncer.NewBlockWatchSyncer(c.etcdClient, blockEventHandler) + blockEventHandler := NewBlockEventHandler(c.processor, c.clusterNodeLister) + c.watchSyncer = NewBlockWatchSyncer(c.etcdClient, blockEventHandler) c.watchSyncer.Start() - blockEventHandler.Run(stopCh) + go blockEventHandler.Run(stopCh) blockEventHandler.WaitForCacheSync(stopCh) c.sync = true @@ -40,11 +44,29 @@ func (c *CalicoETCDAdapter) Start(stopCh <-chan struct{}) error { return nil } -func (c *CalicoETCDAdapter) GetCIDRByNodeName(_ string) ([]string, error) { - // see calicoctl/calicoctl/commands/datastore/migrate/migrateipam.go - // and libcalico-go/lib/backend/model/block_affinity.go - // todo use c.etcdClient to get blockaffinity in etcd - return nil, nil +func (c *CalicoETCDAdapter) GetCIDRByNodeName(nodeName string) ([]string, error) { + var podCIDRS []string + + ctx := context.Background() + + blockAffinityKVList, err := c.etcdClient.List(ctx, model.BlockAffinityListOptions{}, "") + if err != nil { + return nil, err + } + + for _, item := range blockAffinityKVList.KVPairs { + etcdBlockAffinityKey, ok := item.Key.(model.BlockAffinityKey) + if !ok { + return nil, fmt.Errorf("error converting Key to BlockAffinityKey: %+v", item.Key) + } + + if strings.Compare(etcdBlockAffinityKey.Host, nodeName) == 0 { + podCIDRS = append(podCIDRS, etcdBlockAffinityKey.CIDR.String()) + klog.V(4).Infof("BlockAffinityKey %v CIDR appended, nodeName: %s", etcdBlockAffinityKey, nodeName) + } + } + + return podCIDRS, nil } func (c *CalicoETCDAdapter) Synced() bool { diff --git a/pkg/clusterlink/controllers/nodecidr/adaper/calico_etcd_test.go b/pkg/clusterlink/controllers/nodecidr/adaper/calico_etcd_test.go new file mode 100644 index 000000000..fa25d8de9 --- /dev/null +++ b/pkg/clusterlink/controllers/nodecidr/adaper/calico_etcd_test.go @@ -0,0 +1,226 @@ +package adaper + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/projectcalico/calico/libcalico-go/lib/apiconfig" + "github.com/projectcalico/calico/libcalico-go/lib/backend/api" + "github.com/projectcalico/calico/libcalico-go/lib/backend/etcdv3" + "github.com/projectcalico/calico/libcalico-go/lib/backend/model" + "github.com/projectcalico/calico/libcalico-go/lib/net" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +type MockEtcdClient struct { + mock.Mock +} + +func (m *MockEtcdClient) Create(ctx context.Context, object *model.KVPair) (*model.KVPair, error) { + args := m.Called(ctx, object) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*model.KVPair), args.Error(1) +} + +func (m *MockEtcdClient) Update(ctx context.Context, object *model.KVPair) (*model.KVPair, error) { + args := m.Called(ctx, object) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*model.KVPair), args.Error(1) +} + +func (m *MockEtcdClient) Apply(ctx context.Context, object *model.KVPair) (*model.KVPair, error) { + args := m.Called(ctx, object) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*model.KVPair), args.Error(1) +} + +func (m *MockEtcdClient) Delete(ctx context.Context, key model.Key, revision string) (*model.KVPair, error) { + args := m.Called(ctx, key, revision) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*model.KVPair), args.Error(1) +} + +func (m *MockEtcdClient) DeleteKVP(ctx context.Context, object *model.KVPair) (*model.KVPair, error) { + args := m.Called(ctx, object) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*model.KVPair), args.Error(1) +} + +func (m *MockEtcdClient) Get(ctx context.Context, key model.Key, revision string) (*model.KVPair, error) { + args := m.Called(ctx, key, revision) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*model.KVPair), args.Error(1) +} + +func (m *MockEtcdClient) List(ctx context.Context, list model.ListInterface, revision string) (*model.KVPairList, error) { + args := m.Called(ctx, list, revision) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*model.KVPairList), args.Error(1) +} + +func (m *MockEtcdClient) Watch(ctx context.Context, list model.ListInterface, revision string) (api.WatchInterface, error) { + args := m.Called(ctx, list, revision) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(api.WatchInterface), args.Error(1) +} + +func (m *MockEtcdClient) EnsureInitialized() error { + args := m.Called() + return args.Error(0) +} + +func (m *MockEtcdClient) Clean() error { + args := m.Called() + return args.Error(0) +} + +func TestGetCIDRByNodeName(t *testing.T) { + mockClient := new(MockEtcdClient) + adapter := &CalicoETCDAdapter{etcdClient: mockClient} + + expectedCIDR := "10.1.1.0/26" + mockBlockAffinityKey := model.BlockAffinityKey{ + CIDR: *mustParseCIDR(expectedCIDR), + Host: "node1", + } + + mockKVPair := &model.KVPair{ + Key: mockBlockAffinityKey, + } + + mockClient.On("List", mock.Anything, mock.Anything, mock.Anything). + Return(&model.KVPairList{KVPairs: []*model.KVPair{mockKVPair}}, nil) + + tests := []struct { + name string + nodeName string + expectCidrs []string + }{ + { + name: "valid node", + nodeName: "node1", + expectCidrs: []string{"10.1.1.0/26"}, + }, + { + name: "invalid node", + nodeName: "no-nodeName", + expectCidrs: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cidrs, err := adapter.GetCIDRByNodeName(tt.nodeName) + assert.NoError(t, err) + assert.Equal(t, tt.expectCidrs, cidrs) + }) + } + mockClient.AssertExpectations(t) +} + +func mustParseCIDR(cidr string) *net.IPNet { + _, parsedCIDR, _ := net.ParseCIDR(cidr) + return parsedCIDR +} + +func TestEtcdV3ClientWithMock(t *testing.T) { + tests := []struct { + name string + cidr string + nodeName string + expectValue model.BlockAffinity + expectErr error + }{ + { + name: "test1", + cidr: "10.1.1.0/26", + nodeName: "node1", + expectValue: model.BlockAffinity{ + State: "confirmed", + Deleted: false, + }, + expectErr: nil, + }, + { + name: "test2", + cidr: "10.1.1.0/26", + nodeName: "node2", + expectValue: model.BlockAffinity{}, + expectErr: errors.New("key not found"), + }, + } + + mockEtcd := new(MockEtcdClient) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + _, cidr, err := net.ParseCIDR(test.cidr) + if err != nil { + t.Fatalf("Failed to parse CIDR: %v", err) + } + key := model.BlockAffinityKey{ + Host: test.nodeName, + CIDR: *cidr, + } + + expect := test.expectValue + mockValue := &model.KVPair{ + Value: &expect, + } + mockEtcd.On("Get", ctx, key, "").Return(mockValue, test.expectErr) + + get, err := mockEtcd.Get(ctx, key, "") + if err != nil { + assert.Equal(t, test.expectErr.Error(), err.Error()) + return + } + + if get != nil { + value, ok := get.Value.(*model.BlockAffinity) + if !ok { + t.Errorf("Value is not BlockAffinity, err:%v\n", err) + } + assert.Equal(t, test.expectValue.State, value.State) + assert.Equal(t, test.expectValue.Deleted, value.Deleted) + } + + mockEtcd.AssertExpectations(t) + }) + } +} + +// nolint +func createEtcdClient() (api.Client, error) { + etcdConfig := apiconfig.EtcdConfig{ + EtcdEndpoints: "127.0.0.1:2379", + EtcdKeyFile: "/ssl/server.key", + EtcdCertFile: "/ssl/server.crt", + EtcdCACertFile: "/ssl/ca.crt", + } + etcdV3Client, err := etcdv3.NewEtcdV3Client(&etcdConfig) + if err != nil { + return nil, err + } + return etcdV3Client, nil +} diff --git a/pkg/clusterlink/controllers/nodecidr/nodecidr_controller.go b/pkg/clusterlink/controllers/nodecidr/nodecidr_controller.go index 7cb5540c4..f2df70aa3 100644 --- a/pkg/clusterlink/controllers/nodecidr/nodecidr_controller.go +++ b/pkg/clusterlink/controllers/nodecidr/nodecidr_controller.go @@ -121,6 +121,7 @@ func (c *Controller) Start(ctx context.Context) error { klog.Infof("cluster %s's cni is %s", c.clusterName, calicoCNI) c.cniAdapter = adaper.NewCalicoAdapter(c.config, c.clusterNodeLister, c.processor) if clustercontroller.CheckIsEtcd(cluster) { + klog.Infof("calico store type is %s", clustercontroller.EtcdV3) etcdClient, err := clustercontroller.GetETCDClient(cluster) if err != nil { klog.Errorf("init etcd client err: %v", err)