diff --git a/pkg/clusterlink/controllers/calicoippool/calicoippool_controller.go b/pkg/clusterlink/controllers/calicoippool/calicoippool_controller.go index 8a11856f4..18757931c 100644 --- a/pkg/clusterlink/controllers/calicoippool/calicoippool_controller.go +++ b/pkg/clusterlink/controllers/calicoippool/calicoippool_controller.go @@ -465,14 +465,14 @@ func syncIPPool(currentClusterName string, globalExtIPPoolSet ExternalIPPoolSet, } } - klog.V(4).Infof("cluster %s has %d ippools to delete, they are", currentClusterName, len(deleteIPPool), deleteIPPool) + klog.V(4).Infof("cluster %s has %d ippools to delete, they are %v", currentClusterName, len(deleteIPPool), deleteIPPool) err = client.DeleteIPPool(deleteIPPool) if err != nil { klog.Errorf("cluster %s delete ippool err: %v", currentClusterName, err) return err } - klog.V(4).Infof("cluster %s has %d ippools to modify, they are", currentClusterName, len(modifyIPPool), modifyIPPool) + klog.V(4).Infof("cluster %s has %d ippools to modify, they are %v", currentClusterName, len(modifyIPPool), modifyIPPool) err = client.CreateOrUpdateCalicoIPPool(modifyIPPool) if err != nil { klog.Errorf("cluster %s modify ippool err: %v", currentClusterName, err) diff --git a/pkg/clusterlink/controllers/nodecidr/adaper/blockwatchsyncer/blockaffinitysyncer.go b/pkg/clusterlink/controllers/nodecidr/adaper/blockaffinitysyncer.go similarity index 74% rename from pkg/clusterlink/controllers/nodecidr/adaper/blockwatchsyncer/blockaffinitysyncer.go rename to pkg/clusterlink/controllers/nodecidr/adaper/blockaffinitysyncer.go index c33aaf081..4600ad4eb 100644 --- a/pkg/clusterlink/controllers/nodecidr/adaper/blockwatchsyncer/blockaffinitysyncer.go +++ b/pkg/clusterlink/controllers/nodecidr/adaper/blockaffinitysyncer.go @@ -1,7 +1,6 @@ -package blockwatchsyncer +package adaper import ( - apiv3 "github.com/projectcalico/api/pkg/apis/projectcalico/v3" "github.com/projectcalico/calico/libcalico-go/lib/backend/api" "github.com/projectcalico/calico/libcalico-go/lib/backend/model" "github.com/projectcalico/calico/libcalico-go/lib/backend/watchersyncer" @@ -11,7 +10,7 @@ import ( func NewBlockWatchSyncer(client api.Client, callbacks api.SyncerCallbacks) api.Syncer { resourceTypes := []watchersyncer.ResourceType{ { - ListInterface: model.ResourceListOptions{Kind: apiv3.KindBlockAffinity}, + ListInterface: model.BlockAffinityListOptions{}, }, } diff --git a/pkg/clusterlink/controllers/nodecidr/adaper/blockwatchsyncer/blockeventhandler.go b/pkg/clusterlink/controllers/nodecidr/adaper/blockeventhandler.go similarity index 62% rename from pkg/clusterlink/controllers/nodecidr/adaper/blockwatchsyncer/blockeventhandler.go rename to pkg/clusterlink/controllers/nodecidr/adaper/blockeventhandler.go index a1288b300..6cc693df9 100644 --- a/pkg/clusterlink/controllers/nodecidr/adaper/blockwatchsyncer/blockeventhandler.go +++ b/pkg/clusterlink/controllers/nodecidr/adaper/blockeventhandler.go @@ -1,12 +1,15 @@ -package blockwatchsyncer +package adaper import ( "time" "github.com/projectcalico/calico/libcalico-go/lib/backend/api" + "github.com/projectcalico/calico/libcalico-go/lib/backend/model" + log "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" + clusterlister "github.com/kosmos.io/kosmos/pkg/generated/listers/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/utils/lifted" ) @@ -17,7 +20,8 @@ type BlockEventHandler struct { // Channel for getting updates and status updates from syncer. syncerC chan interface{} - processor lifted.AsyncWorker + processor lifted.AsyncWorker + clusterNodeLister clusterlister.ClusterNodeLister // Channel to indicate node status reporter routine is not needed anymore. done chan struct{} @@ -25,9 +29,12 @@ type BlockEventHandler struct { inSync bool } -func NewBlockEventHandler(processor lifted.AsyncWorker) *BlockEventHandler { +func NewBlockEventHandler(processor lifted.AsyncWorker, clusterNodeLister clusterlister.ClusterNodeLister) *BlockEventHandler { return &BlockEventHandler{ - processor: processor, + processor: processor, + clusterNodeLister: clusterNodeLister, + syncerC: make(chan interface{}, 100), + done: make(chan struct{}), } } @@ -71,9 +78,20 @@ func (b *BlockEventHandler) OnUpdates(updates []api.Update) { b.syncerC <- updates } -// todo put etcd's event info AsyncWorker's queue -func (b *BlockEventHandler) onupdate(_ []api.Update) { +func (b *BlockEventHandler) onupdate(event []api.Update) { + klog.V(7).Info("update event") + for _, update := range event { + blockAffinityKey, ok := update.Key.(model.BlockAffinityKey) + if !ok { + log.Errorf("Failed to cast object to blockAffinityKey: %+v", update.Key) + return + } + + node := blockAffinityKey.Host + requeue(node, b.clusterNodeLister, b.processor) + klog.V(4).Infof("Processing blockAffinityKey update: %+v", update.Key) + } } func (b *BlockEventHandler) WaitForCacheSync(stopCh <-chan struct{}) bool { diff --git a/pkg/clusterlink/controllers/nodecidr/adaper/calico_etcd.go b/pkg/clusterlink/controllers/nodecidr/adaper/calico_etcd.go index 7dbf10e19..f34a5a901 100644 --- a/pkg/clusterlink/controllers/nodecidr/adaper/calico_etcd.go +++ b/pkg/clusterlink/controllers/nodecidr/adaper/calico_etcd.go @@ -1,10 +1,14 @@ package adaper import ( + "context" + "fmt" + "strings" + "github.com/projectcalico/calico/libcalico-go/lib/backend/api" + "github.com/projectcalico/calico/libcalico-go/lib/backend/model" "k8s.io/klog/v2" - "github.com/kosmos.io/kosmos/pkg/clusterlink/controllers/nodecidr/adaper/blockwatchsyncer" clusterlister "github.com/kosmos.io/kosmos/pkg/generated/listers/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/utils/lifted" ) @@ -29,10 +33,10 @@ func NewCalicoETCDAdapter(etcdClient api.Client, } func (c *CalicoETCDAdapter) Start(stopCh <-chan struct{}) error { - blockEventHandler := blockwatchsyncer.NewBlockEventHandler(c.processor) - c.watchSyncer = blockwatchsyncer.NewBlockWatchSyncer(c.etcdClient, blockEventHandler) + blockEventHandler := NewBlockEventHandler(c.processor, c.clusterNodeLister) + c.watchSyncer = NewBlockWatchSyncer(c.etcdClient, blockEventHandler) c.watchSyncer.Start() - blockEventHandler.Run(stopCh) + go blockEventHandler.Run(stopCh) blockEventHandler.WaitForCacheSync(stopCh) c.sync = true @@ -40,11 +44,28 @@ func (c *CalicoETCDAdapter) Start(stopCh <-chan struct{}) error { return nil } -func (c *CalicoETCDAdapter) GetCIDRByNodeName(_ string) ([]string, error) { - // see calicoctl/calicoctl/commands/datastore/migrate/migrateipam.go - // and libcalico-go/lib/backend/model/block_affinity.go - // todo use c.etcdClient to get blockaffinity in etcd - return nil, nil +func (c *CalicoETCDAdapter) GetCIDRByNodeName(nodeName string) ([]string, error) { + var podCIDRS []string + + ctx := context.Background() + + blockAffinityKVList, err := c.etcdClient.List(ctx, model.BlockAffinityListOptions{}, "") + if err != nil { + return nil, err + } + + for _, item := range blockAffinityKVList.KVPairs { + etcdBlockAffinityKey, ok := item.Key.(model.BlockAffinityKey) + if !ok { + return nil, fmt.Errorf("error converting Key to BlockAffinityKey: %+v", item.Key) + } + + if strings.Compare(etcdBlockAffinityKey.Host, nodeName) == 0 { + podCIDRS = append(podCIDRS, etcdBlockAffinityKey.CIDR.String()) + } + } + + return podCIDRS, nil } func (c *CalicoETCDAdapter) Synced() bool { diff --git a/pkg/clusterlink/controllers/nodecidr/adaper/calico_etcd_test.go b/pkg/clusterlink/controllers/nodecidr/adaper/calico_etcd_test.go new file mode 100644 index 000000000..961b5b426 --- /dev/null +++ b/pkg/clusterlink/controllers/nodecidr/adaper/calico_etcd_test.go @@ -0,0 +1,185 @@ +package adaper + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + "crypto/tls" + "crypto/x509" + "github.com/projectcalico/calico/libcalico-go/lib/apiconfig" + "github.com/projectcalico/calico/libcalico-go/lib/backend/api" + "github.com/projectcalico/calico/libcalico-go/lib/backend/etcdv3" + "github.com/projectcalico/calico/libcalico-go/lib/backend/model" + "github.com/projectcalico/calico/libcalico-go/lib/net" + "github.com/stretchr/testify/assert" + clientv3 "go.etcd.io/etcd/client/v3" +) + +func TestGetCIDRByNodeName(t *testing.T) { + tests := []struct { + name string + nodeName string + expectCidrs []string + }{ + { + name: "test1", + nodeName: "nodeName", + expectCidrs: []string{"10.xxx.xx.0/26"}, + }, + { + name: "test1", + nodeName: "no-nodeName", + expectCidrs: nil, + }, + } + etcdClient, err := createEtcdClient() + if err != nil { + t.Errorf("connect to etcd failed, err:%v\n", err) + } + adapter := &CalicoETCDAdapter{ + etcdClient: etcdClient, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cidrs, err := adapter.GetCIDRByNodeName(tt.nodeName) + if err != nil { + t.Errorf("get cidr by node name failed, err:%v\n", err) + } + assert.Equal(t, tt.expectCidrs, cidrs) + }) + } +} + +func createEtcdClient() (api.Client, error) { + etcdConfig := apiconfig.EtcdConfig{ + EtcdEndpoints: "127.0.0.1:2379", + EtcdKeyFile: "/ssl/server.key", + EtcdCertFile: "/ssl/server.crt", + EtcdCACertFile: "/ssl/ca.crt", + } + etcdV3Client, err := etcdv3.NewEtcdV3Client(&etcdConfig) + if err != nil { + return nil, err + } + return etcdV3Client, nil +} + +func TestEtcdV3Client(t *testing.T) { + tests := []struct { + name string + cidr string + nodeName string + expectValue model.BlockAffinity + }{ + { + name: "test1", + cidr: "10.222.42.0/26", + nodeName: "node1", + expectValue: model.BlockAffinity{ + State: "confirmed", + Deleted: false, + }, + }, + } + + cli, err := createEtcdClient() + if err != nil { + t.Errorf("connect to etcd failed, err:%v\n", err) + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + _, cidr, err := net.ParseCIDR(test.cidr) + if err != nil { + t.Fatalf("Failed to parse CIDR: %v", err) + } + key := model.BlockAffinityKey{ + Host: test.nodeName, + CIDR: *cidr, + } + get, err := cli.Get(ctx, key, "") + if err != nil { + fmt.Printf("create from etcd failed, err:%v\n", err) + return + } + value, ok := get.Value.(*model.BlockAffinity) + if !ok { + t.Errorf("Value is not BlockAffinity, err:%v\n", err) + } + assert.Equal(t, test.expectValue.State, value.State) + assert.Equal(t, test.expectValue.Deleted, value.Deleted) + }) + } + + cancel() +} + +func TestEtcdClient(t *testing.T) { + tests := []struct { + name string + key string + expectValue []byte + }{ + { + name: "test1", + key: "/calico/ipam/v2/host/nodeName/ipv4/block/10.xxx.xx.0-26", + expectValue: []byte(`{"state":"confirmed","deleted":false}`), + }, + } + + cfg := struct { + EtcdEndpoints string + certFile string + keyFile string + caFile string + }{ + EtcdEndpoints: "127.0.0.1:2379", + certFile: "/ssl/server.crt", + keyFile: "/ssl/server.key", + caFile: "/ssl/ca.crt", + } + tlsConfig, err := tls.LoadX509KeyPair(cfg.certFile, cfg.keyFile) + if err != nil { + fmt.Printf("load tls cert failed, err:%v\n", err) + return + } + caCert, err := os.ReadFile(cfg.caFile) + if err != nil { + fmt.Printf("read ca cert failed, err:%v\n", err) + return + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + tlsCfg := &tls.Config{ + Certificates: []tls.Certificate{tlsConfig}, + RootCAs: caCertPool, + } + + cli, err := clientv3.New(clientv3.Config{ + Endpoints: []string{cfg.EtcdEndpoints}, + DialTimeout: 5 * time.Second, + TLS: tlsCfg, + }) + if err != nil { + fmt.Printf("connect to etcd failed, err:%v\n", err) + return + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + resp, err := cli.Get(ctx, test.key) + if err != nil { + fmt.Printf("get from etcd failed, err:%v\n", err) + return + } + for _, ev := range resp.Kvs { + assert.Equal(t, test.expectValue, ev.Value) + } + }) + } + cancel() +} diff --git a/pkg/clusterlink/controllers/nodecidr/nodecidr_controller.go b/pkg/clusterlink/controllers/nodecidr/nodecidr_controller.go index 7cb5540c4..f2df70aa3 100644 --- a/pkg/clusterlink/controllers/nodecidr/nodecidr_controller.go +++ b/pkg/clusterlink/controllers/nodecidr/nodecidr_controller.go @@ -121,6 +121,7 @@ func (c *Controller) Start(ctx context.Context) error { klog.Infof("cluster %s's cni is %s", c.clusterName, calicoCNI) c.cniAdapter = adaper.NewCalicoAdapter(c.config, c.clusterNodeLister, c.processor) if clustercontroller.CheckIsEtcd(cluster) { + klog.Infof("calico store type is %s", clustercontroller.EtcdV3) etcdClient, err := clustercontroller.GetETCDClient(cluster) if err != nil { klog.Errorf("init etcd client err: %v", err)