Skip to content

Commit

Permalink
neonvm-controller/ipam: add test
Browse files Browse the repository at this point in the history
Signed-off-by: Oleg Vasilev <[email protected]>
  • Loading branch information
Omrigan committed Feb 25, 2025
1 parent 88bb6b9 commit 193e6b7
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 15 deletions.
1 change: 0 additions & 1 deletion neonvm-controller/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
Expand Down
12 changes: 6 additions & 6 deletions pkg/neonvm/ipam/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (

// Set of kubernetets clients
type Client struct {
kubeClient kubernetes.Interface
vmClient neonvm.Interface
nadClient nad.Interface
KubeClient kubernetes.Interface
VMClient neonvm.Interface
NADClient nad.Interface
}

func NewKubeClient(cfg *rest.Config) (*Client, error) {
Expand All @@ -31,8 +31,8 @@ func NewKubeClient(cfg *rest.Config) (*Client, error) {
}

return &Client{
kubeClient: kubeClient,
vmClient: vmClient,
nadClient: nadClient,
KubeClient: kubeClient,
VMClient: vmClient,
NADClient: nadClient,
}, nil
}
21 changes: 13 additions & 8 deletions pkg/neonvm/ipam/ipam.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"

vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"
neonvm "github.com/neondatabase/autoscaling/neonvm/client/clientset/versioned"
Expand Down Expand Up @@ -75,13 +74,13 @@ func (i *IPAM) ReleaseIP(ctx context.Context, vmName types.NamespacedName) (net.

func (i *IPAM) RunCleanup(ctx context.Context, namespace string) error {
for {
vms, err := i.vmClient.NeonvmV1().VirtualMachines(namespace).List(ctx, metav1.ListOptions{})
vms, err := i.VMClient.NeonvmV1().VirtualMachines(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("error listing virtual machines: %w", err)
}
vmsMap := make(map[string]bool)
for _, vm := range vms.Items {
vmsMap[vm.Name] = true
vmsMap[fmt.Sprintf("%s/%s", vm.Namespace, vm.Name)] = true
}
_, err = i.runIPAM(ctx, getCleanupAction(vmsMap))
if err != nil {
Expand Down Expand Up @@ -112,9 +111,12 @@ func New(nadName string, nadNamespace string) (*IPAM, error) {
if err != nil {
return nil, fmt.Errorf("error creating kubernetes client: %w", err)
}
return NewWithClient(kClient, nadName, nadNamespace)
}

func NewWithClient(kClient *Client, nadName string, nadNamespace string) (*IPAM, error) {
// read network-attachment-definition from Kubernetes
nad, err := kClient.nadClient.K8sCniCncfIoV1().NetworkAttachmentDefinitions(nadNamespace).Get(context.Background(), nadName, metav1.GetOptions{})
nad, err := kClient.NADClient.K8sCniCncfIoV1().NetworkAttachmentDefinitions(nadNamespace).Get(context.Background(), nadName, metav1.GetOptions{})
if err != nil {
return nil, err
}
Expand All @@ -126,6 +128,9 @@ func New(nadName string, nadNamespace string) (*IPAM, error) {
if err != nil {
return nil, fmt.Errorf("network-attachment-definition IPAM config parse error: %w", err)
}
if len(ipamConfig.IPRanges) == 0 {
return nil, fmt.Errorf("network-attachment-definition %s has not IP ranges", nad.Name)
}

return &IPAM{
Config: *ipamConfig,
Expand Down Expand Up @@ -272,7 +277,7 @@ func (i *IPAM) runIPAMRange(ctx context.Context, ipRange RangeConfiguration, act

// Status do List() request to check NeonVM client connectivity
func (i *IPAM) Status(ctx context.Context) error {
_, err := i.vmClient.NeonvmV1().IPPools(i.Config.NetworkNamespace).List(ctx, metav1.ListOptions{})
_, err := i.VMClient.NeonvmV1().IPPools(i.Config.NetworkNamespace).List(ctx, metav1.ListOptions{})
return err
}

Expand Down Expand Up @@ -305,7 +310,7 @@ func (i *IPAM) getNeonvmIPPool(ctx context.Context, ipRange string) (*NeonvmIPPo
poolName = fmt.Sprintf("%s-%s", i.Config.NetworkName, strings.ReplaceAll(ipRange, "/", "-"))
}

pool, err := i.vmClient.NeonvmV1().IPPools(i.Config.NetworkNamespace).Get(ctx, poolName, metav1.GetOptions{})
pool, err := i.VMClient.NeonvmV1().IPPools(i.Config.NetworkNamespace).Get(ctx, poolName, metav1.GetOptions{})
if err != nil && apierrors.IsNotFound(err) {
// pool does not exist, create it
newPool := &vmv1.IPPool{
Expand All @@ -318,7 +323,7 @@ func (i *IPAM) getNeonvmIPPool(ctx context.Context, ipRange string) (*NeonvmIPPo
Allocations: make(map[string]vmv1.IPAllocation),
},
}
_, err = i.vmClient.NeonvmV1().IPPools(i.Config.NetworkNamespace).Create(ctx, newPool, metav1.CreateOptions{})
_, err = i.VMClient.NeonvmV1().IPPools(i.Config.NetworkNamespace).Create(ctx, newPool, metav1.CreateOptions{})
if err != nil && apierrors.IsAlreadyExists(err) {
// the pool was just created -- allow retry
return nil, &temporaryError{err}
Expand All @@ -338,7 +343,7 @@ func (i *IPAM) getNeonvmIPPool(ctx context.Context, ipRange string) (*NeonvmIPPo
}

return &NeonvmIPPool{
vmClient: i.Client.vmClient,
vmClient: i.Client.VMClient,
pool: pool,
firstip: ip,
}, nil
Expand Down
196 changes: 196 additions & 0 deletions pkg/neonvm/ipam/ipam_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package ipam_test

import (
"context"
"fmt"
"testing"
"time"

nadv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1"
nadfake "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned/fake"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
kfake "k8s.io/client-go/kubernetes/fake"

vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"
nfake "github.com/neondatabase/autoscaling/neonvm/client/clientset/versioned/fake"
"github.com/neondatabase/autoscaling/pkg/neonvm/ipam"
"github.com/neondatabase/autoscaling/pkg/util/taskgroup"
)

func makeIPAM(t *testing.T, cfg string) *ipam.IPAM {
client := ipam.Client{
KubeClient: kfake.NewSimpleClientset(),
VMClient: nfake.NewSimpleClientset(),
NADClient: nadfake.NewSimpleClientset(),
}
_, err := client.NADClient.K8sCniCncfIoV1().NetworkAttachmentDefinitions("default").Create(context.Background(), &nadv1.NetworkAttachmentDefinition{
TypeMeta: metav1.TypeMeta{
Kind: "NetworkAttachmentDefinition",
APIVersion: "k8s.cni.cncf.io/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "nad",
},
Spec: nadv1.NetworkAttachmentDefinitionSpec{
Config: fmt.Sprintf(`{"ipam":%s}`, cfg),
},
}, metav1.CreateOptions{})
require.NoError(t, err)

ipam, err := ipam.NewWithClient(&client, "nad", "default")
require.NoError(t, err)

return ipam
}

func TestIPAM(t *testing.T) {
ipam := makeIPAM(t,
`{
"ipRanges": [
{
"range":"10.100.123.0/24",
"range_start":"10.100.123.1",
"range_end":"10.100.123.254",
"network_name":"nad"
}
]
}`,
)

defer ipam.Close()

name := types.NamespacedName{
Namespace: "default",
Name: "vm",
}

ip1, err := ipam.AcquireIP(context.Background(), name)
require.NoError(t, err)
require.NotNil(t, ip1)
assert.Equal(t, "10.100.123.1/24", ip1.String())

// Same VM - same IP
ipResult, err := ipam.AcquireIP(context.Background(), name)
require.NoError(t, err)
require.NotNil(t, ipResult)
assert.Equal(t, ip1, ipResult)

// Different VM - different IP
name.Name = "vm2"
ip2, err := ipam.AcquireIP(context.Background(), name)
require.NoError(t, err)
require.NotNil(t, ip2)
assert.Equal(t, "10.100.123.2/24", ip2.String())

// Release the second IP
ipResult, err = ipam.ReleaseIP(context.Background(), name)
require.NoError(t, err)
require.Equal(t, ip2, ipResult)

// Allocate it again
name.Name = "vm3"
ip3, err := ipam.AcquireIP(context.Background(), name)
require.NoError(t, err)
require.NotNil(t, ip3)
assert.Equal(t, ip2, ip3)
}

func TestIPAMCleanup(t *testing.T) {
ipam := makeIPAM(t,
`{
"ipRanges": [
{
"range":"10.100.123.0/24",
"range_start":"10.100.123.1",
"range_end":"10.100.123.254"
}
],
"network_name":"nadNetworkName"
}`,
)

defer ipam.Close()

name := types.NamespacedName{
Namespace: "default",
Name: "vm",
}

ip, err := ipam.AcquireIP(context.Background(), name)
require.NoError(t, err)
require.NotNil(t, ip)

lst, err := ipam.Client.VMClient.NeonvmV1().IPPools("default").List(context.Background(), metav1.ListOptions{})
require.NoError(t, err)
require.NotNil(t, lst)

getIPPool := func() *vmv1.IPPool {
ipPool, err := ipam.Client.VMClient.NeonvmV1().IPPools("default").Get(context.Background(), "nadNetworkName-10.100.123.0-24", metav1.GetOptions{})
require.NoError(t, err)
require.NotNil(t, ipPool)
return ipPool
}

ipPool := getIPPool()

assert.Equal(t, ipPool.Spec.Range, "10.100.123.0/24")
assert.Equal(t, map[string]vmv1.IPAllocation{
// IP offset: allocation
"1": {
ContainerID: "default/vm",
PodRef: "",
},
}, ipPool.Spec.Allocations)

name2 := types.NamespacedName{
Namespace: "default",
Name: "vm2",
}
ip2, err := ipam.AcquireIP(context.Background(), name2)
require.NoError(t, err)
require.NotNil(t, ip2)
assert.Equal(t, ip2.String(), "10.100.123.2/24")

// Let's create only the second VM
// The cleanup will release the first IP, but will keep the second
result, err := ipam.Client.VMClient.NeonvmV1().VirtualMachines("default").Create(context.Background(), &vmv1.VirtualMachine{
ObjectMeta: metav1.ObjectMeta{
Name: "vm2",
},
}, metav1.CreateOptions{})
require.NoError(t, err)
require.NotNil(t, result)

ipPool = getIPPool()
assert.Len(t, ipPool.Spec.Allocations, 2)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

tg := taskgroup.NewGroup(zap.NewNop())

defer tg.Wait() //nolint:errcheck // always nil

tg.Go("cleanup", func(logger *zap.Logger) error {
err := ipam.RunCleanup(ctx, "default")
require.ErrorContains(t, err, "context canceled")
return nil
})

for i := 0; i < 100; i++ {
ipPool = getIPPool()
if len(ipPool.Spec.Allocations) == 1 {
// Test succeeded
assert.Equal(t, ipPool.Spec.Allocations["2"].ContainerID, "default/vm2")
cancel()
return
}
time.Sleep(100 * time.Millisecond)
}
require.Fail(t, "cleanup did not finish", ipPool.Spec.Allocations)
}

0 comments on commit 193e6b7

Please sign in to comment.