Skip to content

Commit

Permalink
fix: get calico blockaffinity by etcd
Browse files Browse the repository at this point in the history
Signed-off-by: renxiangyu_yewu <[email protected]>
  • Loading branch information
renxiangyu_yewu committed Feb 8, 2025
1 parent c19942f commit fdfcbab
Show file tree
Hide file tree
Showing 6 changed files with 244 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -465,14 +465,14 @@ func syncIPPool(currentClusterName string, globalExtIPPoolSet ExternalIPPoolSet,
}
}

klog.V(4).Infof("cluster %s has %d ippools to delete, they are", currentClusterName, len(deleteIPPool), deleteIPPool)
klog.V(4).Infof("cluster %s has %d ippools to delete, they are %v", currentClusterName, len(deleteIPPool), deleteIPPool)
err = client.DeleteIPPool(deleteIPPool)
if err != nil {
klog.Errorf("cluster %s delete ippool err: %v", currentClusterName, err)
return err
}

klog.V(4).Infof("cluster %s has %d ippools to modify, they are", currentClusterName, len(modifyIPPool), modifyIPPool)
klog.V(4).Infof("cluster %s has %d ippools to modify, they are %v", currentClusterName, len(modifyIPPool), modifyIPPool)
err = client.CreateOrUpdateCalicoIPPool(modifyIPPool)
if err != nil {
klog.Errorf("cluster %s modify ippool err: %v", currentClusterName, err)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package blockwatchsyncer
package adaper

import (
apiv3 "github.com/projectcalico/api/pkg/apis/projectcalico/v3"
"github.com/projectcalico/calico/libcalico-go/lib/backend/api"
"github.com/projectcalico/calico/libcalico-go/lib/backend/model"
"github.com/projectcalico/calico/libcalico-go/lib/backend/watchersyncer"
Expand All @@ -11,7 +10,7 @@ import (
func NewBlockWatchSyncer(client api.Client, callbacks api.SyncerCallbacks) api.Syncer {
resourceTypes := []watchersyncer.ResourceType{
{
ListInterface: model.ResourceListOptions{Kind: apiv3.KindBlockAffinity},
ListInterface: model.BlockAffinityListOptions{},
},
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package blockwatchsyncer
package adaper

import (
"time"

"github.com/projectcalico/calico/libcalico-go/lib/backend/api"
"github.com/projectcalico/calico/libcalico-go/lib/backend/model"
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

clusterlister "github.com/kosmos.io/kosmos/pkg/generated/listers/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/utils/lifted"
)

Expand All @@ -17,17 +20,21 @@ type BlockEventHandler struct {
// Channel for getting updates and status updates from syncer.
syncerC chan interface{}

processor lifted.AsyncWorker
processor lifted.AsyncWorker
clusterNodeLister clusterlister.ClusterNodeLister
// Channel to indicate node status reporter routine is not needed anymore.
done chan struct{}

// Flag to show we are in-sync.
inSync bool
}

func NewBlockEventHandler(processor lifted.AsyncWorker) *BlockEventHandler {
func NewBlockEventHandler(processor lifted.AsyncWorker, clusterNodeLister clusterlister.ClusterNodeLister) *BlockEventHandler {
return &BlockEventHandler{
processor: processor,
processor: processor,
clusterNodeLister: clusterNodeLister,
syncerC: make(chan interface{}, 100),
done: make(chan struct{}),
}
}

Expand Down Expand Up @@ -71,9 +78,20 @@ func (b *BlockEventHandler) OnUpdates(updates []api.Update) {
b.syncerC <- updates
}

// todo put etcd's event info AsyncWorker's queue
func (b *BlockEventHandler) onupdate(_ []api.Update) {
func (b *BlockEventHandler) onupdate(event []api.Update) {
klog.V(7).Info("update event")

for _, update := range event {
blockAffinityKey, ok := update.Key.(model.BlockAffinityKey)
if !ok {
log.Errorf("Failed to cast object to blockAffinityKey: %+v", update.Key)
return
}

node := blockAffinityKey.Host
requeue(node, b.clusterNodeLister, b.processor)
klog.V(4).Infof("Processing blockAffinityKey update: %+v", update.Key)
}
}

func (b *BlockEventHandler) WaitForCacheSync(stopCh <-chan struct{}) bool {
Expand Down
39 changes: 30 additions & 9 deletions pkg/clusterlink/controllers/nodecidr/adaper/calico_etcd.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package adaper

import (
"context"
"fmt"
"strings"

"github.com/projectcalico/calico/libcalico-go/lib/backend/api"
"github.com/projectcalico/calico/libcalico-go/lib/backend/model"
"k8s.io/klog/v2"

"github.com/kosmos.io/kosmos/pkg/clusterlink/controllers/nodecidr/adaper/blockwatchsyncer"
clusterlister "github.com/kosmos.io/kosmos/pkg/generated/listers/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/utils/lifted"
)
Expand All @@ -29,22 +33,39 @@ func NewCalicoETCDAdapter(etcdClient api.Client,
}

func (c *CalicoETCDAdapter) Start(stopCh <-chan struct{}) error {
blockEventHandler := blockwatchsyncer.NewBlockEventHandler(c.processor)
c.watchSyncer = blockwatchsyncer.NewBlockWatchSyncer(c.etcdClient, blockEventHandler)
blockEventHandler := NewBlockEventHandler(c.processor, c.clusterNodeLister)
c.watchSyncer = NewBlockWatchSyncer(c.etcdClient, blockEventHandler)
c.watchSyncer.Start()
blockEventHandler.Run(stopCh)
go blockEventHandler.Run(stopCh)

blockEventHandler.WaitForCacheSync(stopCh)
c.sync = true
klog.Info("calico blockaffinities etcd watchsyncer started!")
return nil
}

func (c *CalicoETCDAdapter) GetCIDRByNodeName(_ string) ([]string, error) {
// see calicoctl/calicoctl/commands/datastore/migrate/migrateipam.go
// and libcalico-go/lib/backend/model/block_affinity.go
// todo use c.etcdClient to get blockaffinity in etcd
return nil, nil
func (c *CalicoETCDAdapter) GetCIDRByNodeName(nodeName string) ([]string, error) {
var podCIDRS []string

ctx := context.Background()

blockAffinityKVList, err := c.etcdClient.List(ctx, model.BlockAffinityListOptions{}, "")
if err != nil {
return nil, err
}

for _, item := range blockAffinityKVList.KVPairs {
etcdBlockAffinityKey, ok := item.Key.(model.BlockAffinityKey)
if !ok {
return nil, fmt.Errorf("error converting Key to BlockAffinityKey: %+v", item.Key)
}

if strings.Compare(etcdBlockAffinityKey.Host, nodeName) == 0 {
podCIDRS = append(podCIDRS, etcdBlockAffinityKey.CIDR.String())
}
}

return podCIDRS, nil
}

func (c *CalicoETCDAdapter) Synced() bool {
Expand Down
185 changes: 185 additions & 0 deletions pkg/clusterlink/controllers/nodecidr/adaper/calico_etcd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package adaper

import (
"context"
"fmt"
"os"
"testing"
"time"

"crypto/tls"
"crypto/x509"
"github.com/projectcalico/calico/libcalico-go/lib/apiconfig"
"github.com/projectcalico/calico/libcalico-go/lib/backend/api"
"github.com/projectcalico/calico/libcalico-go/lib/backend/etcdv3"
"github.com/projectcalico/calico/libcalico-go/lib/backend/model"
"github.com/projectcalico/calico/libcalico-go/lib/net"
"github.com/stretchr/testify/assert"
clientv3 "go.etcd.io/etcd/client/v3"
)

func TestGetCIDRByNodeName(t *testing.T) {
tests := []struct {
name string
nodeName string
expectCidrs []string
}{
{
name: "test1",
nodeName: "nodeName",
expectCidrs: []string{"10.xxx.xx.0/26"},
},
{
name: "test1",
nodeName: "no-nodeName",
expectCidrs: nil,
},
}
etcdClient, err := createEtcdClient()
if err != nil {
t.Errorf("connect to etcd failed, err:%v\n", err)
}
adapter := &CalicoETCDAdapter{
etcdClient: etcdClient,
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cidrs, err := adapter.GetCIDRByNodeName(tt.nodeName)
if err != nil {
t.Errorf("get cidr by node name failed, err:%v\n", err)
}
assert.Equal(t, tt.expectCidrs, cidrs)
})
}
}

func createEtcdClient() (api.Client, error) {
etcdConfig := apiconfig.EtcdConfig{
EtcdEndpoints: "127.0.0.1:2379",
EtcdKeyFile: "/ssl/server.key",
EtcdCertFile: "/ssl/server.crt",
EtcdCACertFile: "/ssl/ca.crt",
}
etcdV3Client, err := etcdv3.NewEtcdV3Client(&etcdConfig)
if err != nil {
return nil, err
}
return etcdV3Client, nil
}

func TestEtcdV3Client(t *testing.T) {
tests := []struct {
name string
cidr string
nodeName string
expectValue model.BlockAffinity
}{
{
name: "test1",
cidr: "10.222.42.0/26",
nodeName: "node1",
expectValue: model.BlockAffinity{
State: "confirmed",
Deleted: false,
},
},
}

cli, err := createEtcdClient()
if err != nil {
t.Errorf("connect to etcd failed, err:%v\n", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_, cidr, err := net.ParseCIDR(test.cidr)
if err != nil {
t.Fatalf("Failed to parse CIDR: %v", err)
}
key := model.BlockAffinityKey{
Host: test.nodeName,
CIDR: *cidr,
}
get, err := cli.Get(ctx, key, "")
if err != nil {
fmt.Printf("create from etcd failed, err:%v\n", err)
return
}
value, ok := get.Value.(*model.BlockAffinity)
if !ok {
t.Errorf("Value is not BlockAffinity, err:%v\n", err)
}
assert.Equal(t, test.expectValue.State, value.State)
assert.Equal(t, test.expectValue.Deleted, value.Deleted)
})
}

cancel()
}

func TestEtcdClient(t *testing.T) {
tests := []struct {
name string
key string
expectValue []byte
}{
{
name: "test1",
key: "/calico/ipam/v2/host/nodeName/ipv4/block/10.xxx.xx.0-26",
expectValue: []byte(`{"state":"confirmed","deleted":false}`),
},
}

cfg := struct {
EtcdEndpoints string
certFile string
keyFile string
caFile string
}{
EtcdEndpoints: "127.0.0.1:2379",
certFile: "/ssl/server.crt",
keyFile: "/ssl/server.key",
caFile: "/ssl/ca.crt",
}
tlsConfig, err := tls.LoadX509KeyPair(cfg.certFile, cfg.keyFile)
if err != nil {
fmt.Printf("load tls cert failed, err:%v\n", err)
return
}
caCert, err := os.ReadFile(cfg.caFile)
if err != nil {
fmt.Printf("read ca cert failed, err:%v\n", err)
return
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsCfg := &tls.Config{
Certificates: []tls.Certificate{tlsConfig},
RootCAs: caCertPool,
}

cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{cfg.EtcdEndpoints},
DialTimeout: 5 * time.Second,
TLS: tlsCfg,
})
if err != nil {
fmt.Printf("connect to etcd failed, err:%v\n", err)
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
resp, err := cli.Get(ctx, test.key)
if err != nil {
fmt.Printf("get from etcd failed, err:%v\n", err)
return
}
for _, ev := range resp.Kvs {
assert.Equal(t, test.expectValue, ev.Value)
}
})
}
cancel()
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func (c *Controller) Start(ctx context.Context) error {
klog.Infof("cluster %s's cni is %s", c.clusterName, calicoCNI)
c.cniAdapter = adaper.NewCalicoAdapter(c.config, c.clusterNodeLister, c.processor)
if clustercontroller.CheckIsEtcd(cluster) {
klog.Infof("calico store type is %s", clustercontroller.EtcdV3)
etcdClient, err := clustercontroller.GetETCDClient(cluster)
if err != nil {
klog.Errorf("init etcd client err: %v", err)
Expand Down

0 comments on commit fdfcbab

Please sign in to comment.