From 5724ca0d07ffff49c9095f718821856aa9cfec50 Mon Sep 17 00:00:00 2001 From: Jeewan Rana Date: Thu, 2 Nov 2017 12:16:00 -0700 Subject: [PATCH] network policy chages --- netmaster/k8snetwork/networkpolicy.go | 395 +++++++++++++++++++++++--- utils/k8sutils/k8sutils.go | 22 ++ 2 files changed, 376 insertions(+), 41 deletions(-) diff --git a/netmaster/k8snetwork/networkpolicy.go b/netmaster/k8snetwork/networkpolicy.go index 8a5b57533..4a8979d18 100644 --- a/netmaster/k8snetwork/networkpolicy.go +++ b/netmaster/k8snetwork/networkpolicy.go @@ -7,9 +7,10 @@ import ( "github.com/contiv/client-go/kubernetes" "github.com/contiv/client-go/pkg/api/v1" "github.com/contiv/client-go/pkg/apis/extensions/v1beta1" + "github.com/contiv/client-go/pkg/labels" "github.com/contiv/client-go/pkg/watch" "github.com/contiv/contivmodel/client" - "github.com/contiv/netplugin/utils/k8sutils" + k8sutils "github.com/contiv/netplugin/utils/k8sutils" "reflect" "strings" "time" @@ -18,14 +19,37 @@ import ( const defaultTenantName = "default" const defaultNetworkName = "net" const defaultSubnet = "10.1.2.0/24" -const defaultEpgName = "ingress-group" + +//const defaultEpgName = "ingress-group" +const defaultEpgName = "default-epg" const defaultPolicyName = "ingress-policy" const defaultRuleID = "1" +const defaultPolicyPriority = 2 + +type k8sPodSelector struct { + TenantName string + NetworkName string + PodIps []string + GroupName string +} +type k8sPolicyPorts struct { + Port int + Protocol string +} +type k8sIngress struct { + IngressRules []k8sPolicyPorts + IngressPodSelector []*k8sPodSelector +} +type k8sNetworkPolicy struct { + PodSelector *k8sPodSelector + Ingress []k8sIngress +} type k8sContext struct { - k8sClientSet *kubernetes.Clientset - contivClient *client.ContivClient - isLeader func() bool + k8sClientSet *kubernetes.Clientset + contivClient *client.ContivClient + isLeader func() bool + networkPolicy map[string]k8sNetworkPolicy } var npLog *log.Entry @@ -67,7 +91,7 @@ func (k8sNet *k8sContext) createNetwork(nwName string) error { for func() error { _, err := k8sNet.contivClient.NetworkGet(defaultTenantName, nwName) return err - }() != nil { + }() != nil { //XXX:Should we really poll here ; there would be chances on genuine error and it cause infinity loop time.Sleep(time.Millisecond * 100) } return nil @@ -89,7 +113,7 @@ func (k8sNet *k8sContext) deleteNetwork(nwName string) error { for func() error { _, err := k8sNet.contivClient.NetworkGet(defaultTenantName, nwName) return err - }() == nil { + }() == nil { //XXX: Same here as above time.Sleep(time.Millisecond * 100) } return nil @@ -115,7 +139,7 @@ func (k8sNet *k8sContext) createEpg(nwName, epgName, policyName string) error { for func() error { _, err := k8sNet.contivClient.EndpointGroupGet(defaultTenantName, epgName) return err - }() != nil { + }() != nil { //XXX: Same as above time.Sleep(time.Millisecond * 100) } return nil @@ -136,29 +160,31 @@ func (k8sNet *k8sContext) deleteEpg(networkname, epgName, policyName string) err for func() error { _, err := k8sNet.contivClient.EndpointGroupGet(defaultTenantName, epgName) return err - }() == nil { + }() == nil { //Same as above time.Sleep(time.Millisecond * 100) } return nil } -func (k8sNet *k8sContext) createPolicy(policyName string) error { - npLog.Infof("create policy %s", policyName) +func (k8sNet *k8sContext) createPolicy(tenantName string, epgName string) error { + policyName := k8sutils.EpgNameToPolicy(epgName) - if _, err := k8sNet.contivClient.PolicyGet(defaultTenantName, policyName); err == nil { + npLog.Infof("create policy: %s:%s", policyName, tenantName) + + if _, err := k8sNet.contivClient.PolicyGet(tenantName, policyName); err == nil { return nil } if err := k8sNet.contivClient.PolicyPost(&client.Policy{ - TenantName: defaultTenantName, + TenantName: tenantName, PolicyName: policyName, }); err != nil { - npLog.Errorf("failed to create policy %s", err) + npLog.Errorf("failed to create policy: %v", err) return err } for func() error { - _, err := k8sNet.contivClient.PolicyGet(defaultTenantName, policyName) + _, err := k8sNet.contivClient.PolicyGet(tenantName, policyName) return err }() != nil { time.Sleep(time.Millisecond * 100) @@ -181,36 +207,30 @@ func (k8sNet *k8sContext) deletePolicy(policyName string) error { for func() error { _, err := k8sNet.contivClient.PolicyGet(defaultTenantName, policyName) return err - }() == nil { + }() == nil { //XXX: Same as av time.Sleep(time.Millisecond * 100) } return nil } -func (k8sNet *k8sContext) createRule(policyName, ruleID, action string) error { - npLog.Infof("create rule %s[%s] [%s]", policyName, ruleID, action) +func (k8sNet *k8sContext) createRule(cRule *client.Rule) error { + npLog.Infof("create rule: %+v", *cRule) - if val, err := k8sNet.contivClient.RuleGet(defaultTenantName, policyName, ruleID); err == nil { - if val.Action != action { - k8sNet.deleteRule(policyName, ruleID) + if val, err := k8sNet.contivClient.RuleGet(cRule.TenantName, cRule.PolicyName, cRule.RuleID); err == nil { + if val.Action != cRule.Action { + k8sNet.deleteRule(cRule.TenantName, cRule.PolicyName, cRule.RuleID) } else { return nil } } - if err := k8sNet.contivClient.RulePost(&client.Rule{ - TenantName: defaultTenantName, - PolicyName: policyName, - RuleID: ruleID, - Direction: "in", - Action: action, - }); err != nil { - npLog.Errorf("failed to create rule-id [%s] %s", ruleID, err) + if err := k8sNet.contivClient.RulePost(cRule); err != nil { + npLog.Errorf("failed to create rule: %s, %v", cRule.RuleID, err) return err } for func() error { - _, err := k8sNet.contivClient.RuleGet(defaultTenantName, policyName, ruleID) + _, err := k8sNet.contivClient.RuleGet(cRule.TenantName, cRule.PolicyName, cRule.RuleID) return err }() != nil { time.Sleep(time.Millisecond * 100) @@ -218,27 +238,26 @@ func (k8sNet *k8sContext) createRule(policyName, ruleID, action string) error { return nil } -func (k8sNet *k8sContext) deleteRule(policyName, ruleID string) error { - npLog.Infof("delete rule-id %s", ruleID) +func (k8sNet *k8sContext) deleteRule(tenantName string, policyName, ruleID string) error { + npLog.Infof("delete rule: %s:%s", ruleID, policyName) - if _, err := k8sNet.contivClient.RuleGet(defaultTenantName, policyName, ruleID); err != nil { + if _, err := k8sNet.contivClient.RuleGet(tenantName, policyName, ruleID); err != nil { return nil } - if err := k8sNet.contivClient.RuleDelete(defaultTenantName, policyName, ruleID); err != nil { - npLog.Errorf("failed to delete rule %s[%s], %s", policyName, ruleID, err) + if err := k8sNet.contivClient.RuleDelete(tenantName, policyName, ruleID); err != nil { + npLog.Errorf("failed to delete rule: %s:%s, %v", ruleID, policyName, err) return err } for func() error { - _, err := k8sNet.contivClient.RuleGet(defaultTenantName, policyName, ruleID) + _, err := k8sNet.contivClient.RuleGet(tenantName, policyName, ruleID) return err }() == nil { time.Sleep(time.Millisecond * 100) } return nil } - func (k8sNet *k8sContext) getIsolationPolicy(annotations map[string]string) string { var inPolicy struct { Ingress map[string]string `json:"ingress"` @@ -273,7 +292,7 @@ func (k8sNet *k8sContext) updateDefaultIngressPolicy(ns string, action string) { return } - if err = k8sNet.createPolicy(policyName); err != nil { + if err = k8sNet.createPolicy(defaultTenantName, policyName); err != nil { npLog.Errorf("failed to update policy %s, %s", policyName, err) return } @@ -283,7 +302,13 @@ func (k8sNet *k8sContext) updateDefaultIngressPolicy(ns string, action string) { return } - if err = k8sNet.createRule(policyName, defaultRuleID, action); err != nil { + if err = k8sNet.createRule(&client.Rule{ + TenantName: defaultTenantName, + PolicyName: policyName, + RuleID: defaultRuleID, + Priority: defaultPolicyPriority, + Direction: "in", + Action: "allow"}); err != nil { npLog.Errorf("failed to update default rule, %s", err) return } @@ -296,7 +321,7 @@ func (k8sNet *k8sContext) deleteDefaultIngressPolicy(ns string) { var err error - if err = k8sNet.deleteRule(policyName, defaultRuleID); err != nil { + if err = k8sNet.deleteRule(defaultTenantName, policyName, defaultRuleID); err != nil { npLog.Errorf("failed to delete default rule, %s", err) return } @@ -319,7 +344,10 @@ func (k8sNet *k8sContext) processK8sNetworkPolicy(opCode watch.EventType, np *v1 npLog.Infof("process [%s] network policy %+v", opCode, np) switch opCode { - case watch.Added, watch.Modified: + case watch.Added: + npLog.Infof("Recv [%s] network policy event", opCode, np) + k8sNet.addNetworkPolicy(np) + case watch.Modified: case watch.Deleted: } } @@ -395,3 +423,288 @@ func InitK8SServiceWatch(listenURL string, isLeader func() bool) error { go kubeNet.handleK8sEvents() return nil } + +func (k8sNet *k8sContext) addNetworkPolicy(np *v1beta1.NetworkPolicy) { + //Get all pods which belongs to given label selector + npPodSelector, err := k8sNet.getPodSelector(np.Spec.PodSelector.MatchLabels) + if err != nil { + npLog.Warnf("ignore network policy: %s, %v", np.Name, err) + return + } + npLog.Infof("network policy [%s] pod-selector: %+v", np.Name, npPodSelector) + IngressRules, err := k8sNet.getIngressPolicy(np.Spec.Ingress) + if err != nil { + npLog.Warnf("ignore network policy: %s, %v", np.Name, err) + return + } + npLog.Infof("network Policy [%s] IngressPolicy: %+v", np.Name, IngressRules) + if _, ok := k8sNet.networkPolicy[np.Name]; ok { + npLog.Warnf("delete existing network policy: %s !", np.Name) + k8sNet.deleteNetworkPolicy(np) + } + + nwPolicy := k8sNetworkPolicy{PodSelector: npPodSelector, Ingress: IngressRules} + + npLog.Info("Going to Send Network Policy %+v", nwPolicy) + + if err := k8sNet.applyContivNetworkPolicy(&nwPolicy); err != nil { + npLog.Errorf("[%s] failed to configure policy, %v", np.Name, err) + return + } + + k8sNet.networkPolicy[np.Name] = nwPolicy + +} + +func (k8sNet *k8sContext) applyContivNetworkPolicy(np *k8sNetworkPolicy) error { + var err error + + // don't configure from multiple masters + if k8sNet.isLeader() != true { + return err + } + + // reset policy to deny on any error + policyResetOnErr := func(tenantName, groupName string) { + if err != nil { + //k8sNet.resetPolicy(tenantName, groupName) + } + } + + // policy + if err = k8sNet.createDefaultPolicy(np.PodSelector.TenantName, np.PodSelector.GroupName); err != nil { + npLog.Errorf("failed to create policy %+v, %v", np.PodSelector, err) + return err + } + + defer policyResetOnErr(np.PodSelector.TenantName, np.PodSelector.GroupName) + + // src epg + if _, err := k8sNet.contivClient.EndpointGroupGet(np.PodSelector.TenantName, + np.PodSelector.GroupName); err != nil { + npLog.Infof("epg: %+v doesn't exist", np.PodSelector) + return nil + } + npLog.Info("Got resp from Contiv EndPoint %+v", np.PodSelector) + + // Add epg and rules + for _, ingress := range np.Ingress { + for _, from := range ingress.IngressPodSelector { + // from/to epgs + if _, err := k8sNet.contivClient.EndpointGroupGet(from.TenantName, from.GroupName); err != nil { + npLog.Infof("epg: %+v doesn't exist", from) + return nil + } + npLog.Info("From EndPoint xxContiv EndPoint %+v", from) + + // rules + for _, port := range ingress.IngressRules { + npLog.Infof("configure contiv policy: %+v", port) + for _, FromIP := range from.PodIps { + for _, ToIP := range np.PodSelector.PodIps { + ruleId := k8sutils.PolicyToRuleID(from.GroupName, port.Protocol, port.Port, "in") + if err = k8sNet.createRule(&client.Rule{ + TenantName: np.PodSelector.TenantName, + PolicyName: k8sutils.EpgNameToPolicy(np.PodSelector.GroupName), + FromIpAddress: FromIP, + ToIpAddress: ToIP, + RuleID: ruleId, + Protocol: strings.ToLower(port.Protocol), + Priority: defaultPolicyPriority, + Port: port.Port, + Direction: "in", + Action: "allow"}); err != nil { + npLog.Errorf("failed to create rules in in-policy %+v, %v", np.PodSelector, err) + return err + } + } + } + } + } + } + + return nil +} +func (k8sNet *k8sContext) deleteNetworkPolicy(np *v1beta1.NetworkPolicy) { + npLog.Infof("delete network policy: %s", np.Name) + policy, ok := k8sNet.networkPolicy[np.Name] + if !ok { + npLog.Errorf("network policy: %s is not found", np.Name) + return + } + + if err := k8sNet.cleanupContivNetworkPolicy(&policy); err != nil { + npLog.Errorf("failed to delete network policy: %s, %v", np.Name, err) + return + } + //Remove PolicyId from Policy Db + delete(k8sNet.networkPolicy, np.Name) + +} +func (k8sNet *k8sContext) cleanupContivNetworkPolicy(np *k8sNetworkPolicy) error { + var retErr error + + // don't configure from multiple masters + if k8sNet.isLeader() != true { + return nil + } + policyName := k8sutils.EpgNameToPolicy(np.PodSelector.GroupName) + for _, ingress := range np.Ingress { + for _, from := range ingress.IngressPodSelector { + for _, port := range ingress.IngressRules { + for _, direction := range []string{"in", "out"} { + ruleID := k8sutils.PolicyToRuleID(from.GroupName, port.Protocol, + port.Port, direction) + policyName := k8sutils.EpgNameToPolicy(np.PodSelector.GroupName) + + if err := k8sNet.deleteRule(np.PodSelector.TenantName, policyName, ruleID); err != nil { + npLog.Warnf("failed to delete policy: %s rule: %s, %v", + policyName, ruleID, err) + retErr = err + // try deleting other config + } + } + } + + if err := k8sNet.deleteEpg(from.TenantName, from.GroupName, policyName); err != nil { + npLog.Warnf("failed to delete epg: %+v", from) + retErr = err + } else { + if err := k8sNet.deletePolicy(policyName); err != nil { + npLog.Warnf("failed to delete policy: %s:%s", from.TenantName, from.GroupName) + retErr = err + } + } + } + } + + // delete pod selector epg + if err := k8sNet.deleteEpg(np.PodSelector.TenantName, np.PodSelector.GroupName, policyName); err != nil { + npLog.Warnf("failed to delete epg: %+v", np.PodSelector) + retErr = err + } else { + if err := k8sNet.deletePolicy(policyName); err != nil { + npLog.Warnf("failed to delete policy: %s", np.PodSelector) + retErr = err + } + } + + return retErr +} + +func (k8sNet *k8sContext) getPolicyPorts(policyPort []v1beta1.NetworkPolicyPort) []k8sPolicyPorts { + rules := []k8sPolicyPorts{} + + for _, pol := range policyPort { + port := 0 + protocol := "TCP" // default + + if pol.Port != nil { + port = pol.Port.IntValue() + } + + if pol.Protocol != nil { + protocol = string(*pol.Protocol) + } + + npLog.Infof("ingress policy port: protocol: %v, port: %v", protocol, port) + rules = append(rules, k8sPolicyPorts{Port: port, Protocol: protocol}) + } + return rules +} + +func (k8sNet *k8sContext) getIngressPodSelectorList(peers []v1beta1.NetworkPolicyPeer) ([]*k8sPodSelector, error) { + peerPodSelector := []*k8sPodSelector{} + + if len(peers) <= 0 { + return peerPodSelector, fmt.Errorf("empty pod selectors") + } + + for _, from := range peers { + if from.PodSelector != nil { + s, err := k8sNet.getPodSelector(from.PodSelector.MatchLabels) + // don't apply partial policy. + if err != nil { + return []*k8sPodSelector{}, err + } + npLog.Infof("ingress policy pod-selector: %+v", s) + peerPodSelector = append(peerPodSelector, s) + } + } + return peerPodSelector, nil +} + +func (k8sNet *k8sContext) getIngressPolicy(npIngress []v1beta1.NetworkPolicyIngressRule) ([]k8sIngress, error) { + ingressRules := []k8sIngress{} + + if len(npIngress) <= 0 { + return ingressRules, fmt.Errorf("no ingress rules") + } + + for _, policy := range npIngress { + rules := k8sNet.getPolicyPorts(policy.Ports) + if len(rules) <= 0 { + return ingressRules, fmt.Errorf("empty policy ports") + } + + fromPodSelector, err := k8sNet.getIngressPodSelectorList(policy.From) + // don't apply partial policy. + if err != nil { + return []k8sIngress{}, err + } + ingressRules = append(ingressRules, k8sIngress{IngressRules: rules, IngressPodSelector: fromPodSelector}) + } + return ingressRules, nil +} + +func (k8sNet *k8sContext) getPodSelector(m map[string]string) (*k8sPodSelector, error) { + PodSelector := k8sPodSelector{TenantName: defaultTenantName, NetworkName: defaultNetworkName, GroupName: defaultEpgName} + + // check tenant + if _, err := k8sNet.contivClient.TenantGet(PodSelector.TenantName); err != nil { + return nil, fmt.Errorf("tenant %s doesn't exist, %v", PodSelector.TenantName, err) + } + npLog.Info("Got tenant from Cotiv client") + + // check network + if _, err := k8sNet.contivClient.NetworkGet(PodSelector.TenantName, PodSelector.NetworkName); err != nil { + return nil, fmt.Errorf("network: +%v doesn't exist, %v", PodSelector, err) + } + npLog.Info("Got network from Cotiv client") + + podsList, err := k8sNet.k8sClientSet.CoreV1().Pods("kube-system").List(v1.ListOptions{LabelSelector: labels.SelectorFromSet(m).String()}) + if err != nil { + npLog.Fatalf("failed to get Pods from K8S Server, %v", err) + return nil, err + } + npLog.Info("Got Pods Ips info From APIS server") + for _, pod := range podsList.Items { + PodSelector.PodIps = append(PodSelector.PodIps, pod.Status.PodIP) + npLog.Info("Recv %s PodIp", pod.Status.PodIP) + } + npLog.Info("PodSelector %+v PodIp", PodSelector) + + return &PodSelector, err +} + +func (k8sNet *k8sContext) createDefaultPolicy(tenantName string, epgName string) error { + var err error + + if err = k8sNet.createPolicy(defaultTenantName, epgName); err != nil { + return err + } + + for _, direction := range []string{"in", "out"} { + if err = k8sNet.createRule(&client.Rule{ + TenantName: tenantName, + PolicyName: k8sutils.EpgNameToPolicy(epgName), + RuleID: k8sutils.DenyAllRuleID + direction, + Priority: k8sutils.DenyAllPriority, + Direction: direction, + Action: "allow", + }); err != nil { + return err + } + } + return nil +} diff --git a/utils/k8sutils/k8sutils.go b/utils/k8sutils/k8sutils.go index aefe83605..a0b9b0a61 100644 --- a/utils/k8sutils/k8sutils.go +++ b/utils/k8sutils/k8sutils.go @@ -7,6 +7,7 @@ import ( "github.com/contiv/client-go/kubernetes" k8sRest "github.com/contiv/client-go/rest" "io/ioutil" + "strconv" "strings" ) @@ -25,8 +26,29 @@ const ( contivKubeCfgFile = "/opt/contiv/config/contiv.json" defSvcSubnet = "10.254.0.0/16" tokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token" + // DenyAllRuleID default deny all rule id + DenyAllRuleID = "deny-all-0-" + // DenyAllPriority default deny all rule priority + DenyAllPriority = 1 + + // K8sTenantLabel k8s tenant label used by contiv + K8sTenantLabel = "io.contiv.tenant" + // K8sNetworkLabel k8s network label used by contiv + K8sNetworkLabel = "io.contiv.network" + // K8sGroupLabel k8s group label used by contiv + K8sGroupLabel = "io.contiv.net-group" ) +// EpgNameToPolicy generate policy name from endpoint group +func EpgNameToPolicy(epgName string) string { + return epgName + "-policy" +} + +// PolicyToRuleID generate rule id from policy details +func PolicyToRuleID(epgName string, protocol string, port int, direction string) string { + return epgName + "-" + protocol + "-" + strconv.Itoa(port) + "-" + direction +} + // GetK8SConfig reads and parses the contivKubeCfgFile func GetK8SConfig(pCfg *ContivConfig) error { bytes, err := ioutil.ReadFile(contivKubeCfgFile)