Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: get calico blockaffinity by etcd #825

Merged
merged 1 commit into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -465,14 +465,14 @@ func syncIPPool(currentClusterName string, globalExtIPPoolSet ExternalIPPoolSet,
}
}

klog.V(4).Infof("cluster %s has %d ippools to delete, they are", currentClusterName, len(deleteIPPool), deleteIPPool)
klog.V(4).Infof("cluster %s has %d ippools to delete, they are %v", currentClusterName, len(deleteIPPool), deleteIPPool)
err = client.DeleteIPPool(deleteIPPool)
if err != nil {
klog.Errorf("cluster %s delete ippool err: %v", currentClusterName, err)
return err
}

klog.V(4).Infof("cluster %s has %d ippools to modify, they are", currentClusterName, len(modifyIPPool), modifyIPPool)
klog.V(4).Infof("cluster %s has %d ippools to modify, they are %v", currentClusterName, len(modifyIPPool), modifyIPPool)
err = client.CreateOrUpdateCalicoIPPool(modifyIPPool)
if err != nil {
klog.Errorf("cluster %s modify ippool err: %v", currentClusterName, err)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package blockwatchsyncer
package adaper

import (
apiv3 "github.com/projectcalico/api/pkg/apis/projectcalico/v3"
"github.com/projectcalico/calico/libcalico-go/lib/backend/api"
"github.com/projectcalico/calico/libcalico-go/lib/backend/model"
"github.com/projectcalico/calico/libcalico-go/lib/backend/watchersyncer"
Expand All @@ -11,7 +10,7 @@ import (
func NewBlockWatchSyncer(client api.Client, callbacks api.SyncerCallbacks) api.Syncer {
resourceTypes := []watchersyncer.ResourceType{
{
ListInterface: model.ResourceListOptions{Kind: apiv3.KindBlockAffinity},
ListInterface: model.BlockAffinityListOptions{},
},
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package blockwatchsyncer
package adaper

import (
"time"

"github.com/projectcalico/calico/libcalico-go/lib/backend/api"
"github.com/projectcalico/calico/libcalico-go/lib/backend/model"
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

clusterlister "github.com/kosmos.io/kosmos/pkg/generated/listers/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/utils/lifted"
)

Expand All @@ -17,17 +20,21 @@ type BlockEventHandler struct {
// Channel for getting updates and status updates from syncer.
syncerC chan interface{}

processor lifted.AsyncWorker
// Channel to indicate node status reporter routine is not needed anymore.
processor lifted.AsyncWorker
clusterNodeLister clusterlister.ClusterNodeLister
// Channel that notifies the Run method to exit
done chan struct{}

// Flag to show we are in-sync.
inSync bool
}

func NewBlockEventHandler(processor lifted.AsyncWorker) *BlockEventHandler {
func NewBlockEventHandler(processor lifted.AsyncWorker, clusterNodeLister clusterlister.ClusterNodeLister) *BlockEventHandler {
return &BlockEventHandler{
processor: processor,
processor: processor,
clusterNodeLister: clusterNodeLister,
syncerC: make(chan interface{}, 100),
done: make(chan struct{}),
}
}

Expand Down Expand Up @@ -71,9 +78,20 @@ func (b *BlockEventHandler) OnUpdates(updates []api.Update) {
b.syncerC <- updates
}

// todo put etcd's event info AsyncWorker's queue
func (b *BlockEventHandler) onupdate(_ []api.Update) {
func (b *BlockEventHandler) onupdate(event []api.Update) {
klog.V(7).Info("update event")

for _, update := range event {
blockAffinityKey, ok := update.Key.(model.BlockAffinityKey)
if !ok {
log.Errorf("Failed to cast object to blockAffinityKey: %+v", update.Key)
return
}

node := blockAffinityKey.Host
requeue(node, b.clusterNodeLister, b.processor)
klog.V(4).Infof("Processing blockAffinityKey update: %+v", update.Key)
}
}

func (b *BlockEventHandler) WaitForCacheSync(stopCh <-chan struct{}) bool {
Expand Down
40 changes: 31 additions & 9 deletions pkg/clusterlink/controllers/nodecidr/adaper/calico_etcd.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package adaper

import (
"context"
"fmt"
"strings"

"github.com/projectcalico/calico/libcalico-go/lib/backend/api"
"github.com/projectcalico/calico/libcalico-go/lib/backend/model"
"k8s.io/klog/v2"

"github.com/kosmos.io/kosmos/pkg/clusterlink/controllers/nodecidr/adaper/blockwatchsyncer"
clusterlister "github.com/kosmos.io/kosmos/pkg/generated/listers/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/utils/lifted"
)
Expand All @@ -29,22 +33,40 @@ func NewCalicoETCDAdapter(etcdClient api.Client,
}

func (c *CalicoETCDAdapter) Start(stopCh <-chan struct{}) error {
blockEventHandler := blockwatchsyncer.NewBlockEventHandler(c.processor)
c.watchSyncer = blockwatchsyncer.NewBlockWatchSyncer(c.etcdClient, blockEventHandler)
blockEventHandler := NewBlockEventHandler(c.processor, c.clusterNodeLister)
c.watchSyncer = NewBlockWatchSyncer(c.etcdClient, blockEventHandler)
c.watchSyncer.Start()
blockEventHandler.Run(stopCh)
go blockEventHandler.Run(stopCh)

blockEventHandler.WaitForCacheSync(stopCh)
c.sync = true
klog.Info("calico blockaffinities etcd watchsyncer started!")
return nil
}

func (c *CalicoETCDAdapter) GetCIDRByNodeName(_ string) ([]string, error) {
// see calicoctl/calicoctl/commands/datastore/migrate/migrateipam.go
// and libcalico-go/lib/backend/model/block_affinity.go
// todo use c.etcdClient to get blockaffinity in etcd
return nil, nil
func (c *CalicoETCDAdapter) GetCIDRByNodeName(nodeName string) ([]string, error) {
var podCIDRS []string

ctx := context.Background()

blockAffinityKVList, err := c.etcdClient.List(ctx, model.BlockAffinityListOptions{}, "")
if err != nil {
return nil, err
}

for _, item := range blockAffinityKVList.KVPairs {
etcdBlockAffinityKey, ok := item.Key.(model.BlockAffinityKey)
if !ok {
return nil, fmt.Errorf("error converting Key to BlockAffinityKey: %+v", item.Key)
}

if strings.Compare(etcdBlockAffinityKey.Host, nodeName) == 0 {
podCIDRS = append(podCIDRS, etcdBlockAffinityKey.CIDR.String())
klog.V(4).Infof("BlockAffinityKey %v CIDR appended, nodeName: %s", etcdBlockAffinityKey, nodeName)
}
}

return podCIDRS, nil
}

func (c *CalicoETCDAdapter) Synced() bool {
Expand Down
226 changes: 226 additions & 0 deletions pkg/clusterlink/controllers/nodecidr/adaper/calico_etcd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
package adaper

import (
"context"
"errors"
"testing"
"time"

"github.com/projectcalico/calico/libcalico-go/lib/apiconfig"
"github.com/projectcalico/calico/libcalico-go/lib/backend/api"
"github.com/projectcalico/calico/libcalico-go/lib/backend/etcdv3"
"github.com/projectcalico/calico/libcalico-go/lib/backend/model"
"github.com/projectcalico/calico/libcalico-go/lib/net"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

type MockEtcdClient struct {
mock.Mock
}

func (m *MockEtcdClient) Create(ctx context.Context, object *model.KVPair) (*model.KVPair, error) {
args := m.Called(ctx, object)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(*model.KVPair), args.Error(1)
}

func (m *MockEtcdClient) Update(ctx context.Context, object *model.KVPair) (*model.KVPair, error) {
args := m.Called(ctx, object)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(*model.KVPair), args.Error(1)
}

func (m *MockEtcdClient) Apply(ctx context.Context, object *model.KVPair) (*model.KVPair, error) {
args := m.Called(ctx, object)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(*model.KVPair), args.Error(1)
}

func (m *MockEtcdClient) Delete(ctx context.Context, key model.Key, revision string) (*model.KVPair, error) {
args := m.Called(ctx, key, revision)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(*model.KVPair), args.Error(1)
}

func (m *MockEtcdClient) DeleteKVP(ctx context.Context, object *model.KVPair) (*model.KVPair, error) {
args := m.Called(ctx, object)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(*model.KVPair), args.Error(1)
}

func (m *MockEtcdClient) Get(ctx context.Context, key model.Key, revision string) (*model.KVPair, error) {
args := m.Called(ctx, key, revision)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(*model.KVPair), args.Error(1)
}

func (m *MockEtcdClient) List(ctx context.Context, list model.ListInterface, revision string) (*model.KVPairList, error) {
args := m.Called(ctx, list, revision)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(*model.KVPairList), args.Error(1)
}

func (m *MockEtcdClient) Watch(ctx context.Context, list model.ListInterface, revision string) (api.WatchInterface, error) {
args := m.Called(ctx, list, revision)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(api.WatchInterface), args.Error(1)
}

func (m *MockEtcdClient) EnsureInitialized() error {
args := m.Called()
return args.Error(0)
}

func (m *MockEtcdClient) Clean() error {
args := m.Called()
return args.Error(0)
}

func TestGetCIDRByNodeName(t *testing.T) {
mockClient := new(MockEtcdClient)
adapter := &CalicoETCDAdapter{etcdClient: mockClient}

expectedCIDR := "10.1.1.0/26"
mockBlockAffinityKey := model.BlockAffinityKey{
CIDR: *mustParseCIDR(expectedCIDR),
Host: "node1",
}

mockKVPair := &model.KVPair{
Key: mockBlockAffinityKey,
}

mockClient.On("List", mock.Anything, mock.Anything, mock.Anything).
Return(&model.KVPairList{KVPairs: []*model.KVPair{mockKVPair}}, nil)

tests := []struct {
name string
nodeName string
expectCidrs []string
}{
{
name: "valid node",
nodeName: "node1",
expectCidrs: []string{"10.1.1.0/26"},
},
{
name: "invalid node",
nodeName: "no-nodeName",
expectCidrs: nil,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cidrs, err := adapter.GetCIDRByNodeName(tt.nodeName)
assert.NoError(t, err)
assert.Equal(t, tt.expectCidrs, cidrs)
})
}
mockClient.AssertExpectations(t)
}

func mustParseCIDR(cidr string) *net.IPNet {
_, parsedCIDR, _ := net.ParseCIDR(cidr)
return parsedCIDR
}

func TestEtcdV3ClientWithMock(t *testing.T) {
tests := []struct {
name string
cidr string
nodeName string
expectValue model.BlockAffinity
expectErr error
}{
{
name: "test1",
cidr: "10.1.1.0/26",
nodeName: "node1",
expectValue: model.BlockAffinity{
State: "confirmed",
Deleted: false,
},
expectErr: nil,
},
{
name: "test2",
cidr: "10.1.1.0/26",
nodeName: "node2",
expectValue: model.BlockAffinity{},
expectErr: errors.New("key not found"),
},
}

mockEtcd := new(MockEtcdClient)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_, cidr, err := net.ParseCIDR(test.cidr)
if err != nil {
t.Fatalf("Failed to parse CIDR: %v", err)
}
key := model.BlockAffinityKey{
Host: test.nodeName,
CIDR: *cidr,
}

expect := test.expectValue
mockValue := &model.KVPair{
Value: &expect,
}
mockEtcd.On("Get", ctx, key, "").Return(mockValue, test.expectErr)

get, err := mockEtcd.Get(ctx, key, "")
if err != nil {
assert.Equal(t, test.expectErr.Error(), err.Error())
return
}

if get != nil {
value, ok := get.Value.(*model.BlockAffinity)
if !ok {
t.Errorf("Value is not BlockAffinity, err:%v\n", err)
}
assert.Equal(t, test.expectValue.State, value.State)
assert.Equal(t, test.expectValue.Deleted, value.Deleted)
}

mockEtcd.AssertExpectations(t)
})
}
}

// nolint
func createEtcdClient() (api.Client, error) {
etcdConfig := apiconfig.EtcdConfig{
EtcdEndpoints: "127.0.0.1:2379",
EtcdKeyFile: "/ssl/server.key",
EtcdCertFile: "/ssl/server.crt",
EtcdCACertFile: "/ssl/ca.crt",
}
etcdV3Client, err := etcdv3.NewEtcdV3Client(&etcdConfig)
if err != nil {
return nil, err
}
return etcdV3Client, nil
}
Loading
Loading