Skip to content

Commit

Permalink
optimize the route connectivity
Browse files Browse the repository at this point in the history
- Add connectivity initialization
- Add connectivity status "DHCP failed" and "ping failed", delete
  "unknown"
- Add pinger timeout and allow 20% package loss
- Optimize job updating
  • Loading branch information
yaocw2020 authored and guangbochen committed Dec 14, 2021
1 parent 3471c8c commit c3b0e85
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 98 deletions.
5 changes: 1 addition & 4 deletions cmd/network-helper/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,7 @@ func run(c *cli.Context) error {
netHelper := helper.New(cni)

for _, selectedNetwork := range selectedNetworks {
networkConf, err := netHelper.GetVLANLayer3Network(&selectedNetwork, dhcpServerIPAddr)
if err != nil {
return fmt.Errorf("failed to get vlan layer3 network, selectedNetwork: %+v, error: %w", selectedNetworks, err)
}
networkConf := netHelper.GetVLANLayer3Network(&selectedNetwork, dhcpServerIPAddr)

if err := netHelper.RecordToNad(&selectedNetwork, networkConf); err != nil {
return fmt.Errorf("failed to record to nad cr, error: %w", err)
Expand Down
177 changes: 94 additions & 83 deletions pkg/controller/manager/nad/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ const (

defaultInterface = "net1"

defaultPingTimes = 5
defaultCheckPeriod = 15 * time.Minute
defaultPingTimes = 5
defaultPingTimeout = 10 * time.Second
defaultCheckPeriod = 15 * time.Minute
defaultAllowPackageLostRate = 20
)

type nameWithNamespace struct {
Expand Down Expand Up @@ -78,7 +80,7 @@ func Register(ctx context.Context, management *config.Management) error {
},
}

go handler.CheckConnectivity()
go handler.CheckConnectivityPeriodically()

nad.OnChange(ctx, ControllerName, handler.OnChange)
nad.OnRemove(ctx, ControllerName, handler.OnRemove)
Expand All @@ -97,22 +99,23 @@ func (h Handler) OnChange(key string, nad *cniv1.NetworkAttachmentDefinition) (*
return nil, err
}

// check annotations
if nad.Annotations == nil || nad.Annotations[utils.KeyNetworkConf] == "" {
return nad, nil
}
networkConf, err := utils.NewLayer3NetworkConf(nad.Annotations[utils.KeyNetworkConf])
if err != nil {
return nil, fmt.Errorf("invalid layer 3 network configure: %w", err)
networkConf := &utils.Layer3NetworkConf{}
if nad.Annotations != nil && nad.Annotations[utils.KeyNetworkConf] != "" {
var err error
networkConf, err = utils.NewLayer3NetworkConf(nad.Annotations[utils.KeyNetworkConf])
if err != nil {
return nil, fmt.Errorf("invalid layer 3 network configure: %w", err)
}
}

klog.Infof("netconf: %+v", networkConf)

if networkConf.CIDR != "" && networkConf.Gateway != "" {
// set connectivity as the initial status unknown
// initialize connectivity
if networkConf.Connectivity == "" {
if err := h.setUnknown(nad, networkConf); err != nil {
return nil, err
if err := h.initializeConnectivity(nad, networkConf); err != nil {
klog.Errorf("initialize connectivity of nad %s/%s failed, error: %v", nad.Namespace, nad.Name, err)
} else {
klog.Infof("initialize connectivity of nad %s/%s successfully", nad.Namespace, nad.Name)
}
}
// add item to map
Expand Down Expand Up @@ -148,20 +151,6 @@ func (h Handler) ensureLabels(nad *cniv1.NetworkAttachmentDefinition) error {
return err
}

func (h Handler) setUnknown(nad *cniv1.NetworkAttachmentDefinition, networkConf *utils.Layer3NetworkConf) error {
networkConf.Connectivity = utils.Unknown
nadCopy := nad.DeepCopy()
nadStr, err := networkConf.ToString()
if err != nil {
return err
}
nad.Annotations[utils.KeyNetworkConf] = nadStr

_, err = h.nadClient.Update(nadCopy)

return err
}

func (h Handler) OnRemove(key string, nad *cniv1.NetworkAttachmentDefinition) (*cniv1.NetworkAttachmentDefinition, error) {
if nad == nil {
return nil, nil
Expand Down Expand Up @@ -228,57 +217,51 @@ func constructJob(cur *batchv1.Job, namespace, image, dhcpServerAddr string, nad
return nil, err
}

job.Spec.Template = corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
cniv1.NetworkAttachmentAnnot: selectedNetworks,
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
// annotations
if job.Spec.Template.ObjectMeta.Annotations == nil {
job.Spec.Template.ObjectMeta.Annotations = make(map[string]string)
}
job.Spec.Template.ObjectMeta.Annotations[cniv1.NetworkAttachmentAnnot] = selectedNetworks

// podSpec
job.Spec.Template.Spec.Containers = []corev1.Container{
{
Name: jobContainerName,
Image: image,
Env: []corev1.EnvVar{
{
Name: jobContainerName,
Image: image,
Env: []corev1.EnvVar{
{
Name: JobEnvNadNetwork,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: fmt.Sprintf("metadata.annotations['%s']", cniv1.NetworkAttachmentAnnot),
},
},
},
{
Name: JobEnvDHCPServer,
Value: dhcpServerAddr,
Name: JobEnvNadNetwork,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: fmt.Sprintf("metadata.annotations['%s']", cniv1.NetworkAttachmentAnnot),
},
},
ImagePullPolicy: corev1.PullIfNotPresent,
},
{
Name: JobEnvDHCPServer,
Value: dhcpServerAddr,
},
},
RestartPolicy: corev1.RestartPolicyNever,
ServiceAccountName: jobServiceAccountName,
ImagePullPolicy: corev1.PullIfNotPresent,
},
}
backoffLimit := int32(2)

job.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyNever
job.Spec.Template.Spec.ServiceAccountName = jobServiceAccountName
backoffLimit := int32(1)
job.Spec.BackoffLimit = &backoffLimit

return job, nil
}

func (h Handler) CheckConnectivity() {
func (h Handler) CheckConnectivityPeriodically() {
ticker := time.NewTicker(defaultCheckPeriod)

for range ticker.C {
h.mutex.RLock()
for nn, gw := range h.items {
go func(nn nameWithNamespace, gw string) {
connectivity, err := pingGW(gw)
if err != nil {
klog.Error(err)
return
}
if err := h.updateNADConnectivity(nn.namespace, nn.name, connectivity); err != nil {
if err := h.checkConnectivity(nn.namespace, nn.name, gw); err != nil {
klog.Error(err)
return
}
Expand All @@ -288,21 +271,63 @@ func (h Handler) CheckConnectivity() {
}
}

func (h Handler) checkConnectivity(namespace, name, gw string) error {
connectivity, err := pingGW(gw)
if err != nil {
return err
}

nad, err := h.nadCache.Get(namespace, name)
if err != nil {
return fmt.Errorf("get cache of %s/%s failed, error: %s", namespace, name, err)
}

networkConf := &utils.Layer3NetworkConf{}
if nad.Annotations != nil && nad.Annotations[utils.KeyNetworkConf] != "" {
networkConf, err = utils.NewLayer3NetworkConf(nad.Annotations[utils.KeyNetworkConf])
if err != nil {
return fmt.Errorf("invalid layer 3 network configure: %w", err)
}
}

if networkConf.Connectivity == connectivity {
return nil
}
networkConf.Connectivity = connectivity

return h.updateNetworkConf(nad, networkConf)
}

func (h Handler) initializeConnectivity(nad *cniv1.NetworkAttachmentDefinition, networkConf *utils.Layer3NetworkConf) error {
connectivity, err := pingGW(networkConf.Gateway)
if err != nil {
return err
}

if networkConf.Connectivity == connectivity {
return nil
}
networkConf.Connectivity = connectivity

return h.updateNetworkConf(nad, networkConf)
}

func pingGW(gw string) (utils.Connectivity, error) {
connectivity := utils.Unknown
connectivity := utils.PingFailed

pinger, err := ping.NewPinger(gw)
if err != nil {
return connectivity, fmt.Errorf("create pinger failed, error: %s", err.Error())
}
pinger.SetPrivileged(true)
pinger.Count = defaultPingTimes
pinger.Timeout = defaultPingTimeout
if err := pinger.Run(); err != nil {
return connectivity, err
return connectivity, fmt.Errorf("ping gw %s failed, error: %w", gw, err)
} // blocks until finished
stats := pinger.Statistics()

if stats.PacketsSent != stats.PacketsRecv {
if stats.PacketLoss > defaultAllowPackageLostRate {
connectivity = utils.Unconnectable
} else {
connectivity = utils.Connectable
Expand All @@ -311,32 +336,18 @@ func pingGW(gw string) (utils.Connectivity, error) {
return connectivity, nil
}

func (h Handler) updateNADConnectivity(namespace, name string, connectivity utils.Connectivity) error {
nad, err := h.nadCache.Get(namespace, name)
if err != nil {
return fmt.Errorf("get cache of %s/%s failed, error: %s", namespace, name, err)
}
if nad.Annotations == nil || nad.Annotations[utils.KeyNetworkConf] == "" {
return nil
}
networkConf, err := utils.NewLayer3NetworkConf(nad.Annotations[utils.KeyNetworkConf])
if err != nil {
return fmt.Errorf("invalid layer 3 network configure: %w", err)
}

if networkConf.Connectivity == connectivity {
return nil
}

networkConf.Connectivity = connectivity
func (h Handler) updateNetworkConf(nad *cniv1.NetworkAttachmentDefinition, networkConf *utils.Layer3NetworkConf) error {
nadCopy := nad.DeepCopy()
confStr, err := networkConf.ToString()
if err != nil {
return err
}
if nadCopy.Annotations == nil {
nadCopy.Annotations = make(map[string]string)
}
nadCopy.Annotations[utils.KeyNetworkConf] = confStr
if _, err := h.nadClient.Update(nadCopy); err != nil {
return err
return fmt.Errorf("update nad %s/%s failed, error: %w", nad.Namespace, nad.Name, err)
}

return nil
Expand Down
23 changes: 13 additions & 10 deletions pkg/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
ctlcniv1 "github.com/harvester/harvester/pkg/generated/controllers/k8s.cni.cncf.io/v1"
nadv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog"

"github.com/harvester/harvester-network-controller/pkg/utils"
)
Expand All @@ -21,19 +22,21 @@ func New(cniFactory *ctlcni.Factory) *NetHelper {
}
}

func (n *NetHelper) GetVLANLayer3Network(selectedNetwork *nadv1.NetworkSelectionElement, serverIPAddr string) (*utils.Layer3NetworkConf, error) {
func (n *NetHelper) GetVLANLayer3Network(selectedNetwork *nadv1.NetworkSelectionElement, serverIPAddr string) *utils.Layer3NetworkConf {
networkConf := &utils.Layer3NetworkConf{
Mode: utils.Auto,
ServerIPAddr: serverIPAddr,
}
cidr, gw, err := obtainCIDRAndGw(selectedNetwork.InterfaceRequest, net.ParseIP(serverIPAddr))
if err != nil {
return nil, err
if err == nil {
networkConf.CIDR = cidr.String()
networkConf.Gateway = gw.String()
} else {
klog.Errorf("obtain CIDR and gw using DHCP protocol failed, error: %v", err)
networkConf.Connectivity = utils.DHCPFailed
}

return &utils.Layer3NetworkConf{
Mode: utils.Auto,
ServerIPAddr: serverIPAddr,
CIDR: cidr.String(),
Gateway: gw.String(),
Connectivity: utils.Unknown,
}, nil
return networkConf
}

func (n *NetHelper) RecordToNad(selectedNetwork *nadv1.NetworkSelectionElement, networkConf *utils.Layer3NetworkConf) error {
Expand Down
3 changes: 2 additions & 1 deletion pkg/utils/nadconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ type Connectivity string
const (
Connectable Connectivity = "true"
Unconnectable Connectivity = "false"
Unknown Connectivity = "unknown"
DHCPFailed Connectivity = "DHCP failed"
PingFailed Connectivity = "ping failed"
)

type Mode string
Expand Down

0 comments on commit c3b0e85

Please sign in to comment.