diff --git a/netmaster/k8snetwork/networkpolicy.go b/netmaster/k8snetwork/networkpolicy.go index 767880bfa..56f7f4f1f 100644 --- a/netmaster/k8snetwork/networkpolicy.go +++ b/netmaster/k8snetwork/networkpolicy.go @@ -1,58 +1,114 @@ package networkpolicy import ( - "encoding/json" "fmt" log "github.com/Sirupsen/logrus" "github.com/contiv/netplugin/contivmodel/client" "github.com/contiv/netplugin/utils/k8sutils" - "k8s.io/api/networking/v1" + v1 "k8s.io/api/core/v1" + network_v1 "k8s.io/api/networking/v1" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "reflect" + "strconv" "strings" "time" ) const defaultTenantName = "default" -const defaultNetworkName = "net" +const defaultNetworkName = "default-net" const defaultSubnet = "10.1.2.0/24" //const defaultEpgName = "ingress-group" -const defaultEpgName = "default-epg" +//const defaultEpgName = "default-epg" +const defaultEpgName = "default-group" const defaultPolicyName = "ingress-policy" const defaultRuleID = "1" const defaultPolicyPriority = 2 type k8sPodSelector struct { - TenantName string - NetworkName string - PodIps []string - GroupName string + TenantName string //Attach Tenant + NetworkName string //Attach network + GroupName string //Attach EPG + PolicyName string //Attach to policy + labelPodMap map[string]map[string]bool + podIps map[string]string } type k8sPolicyPorts struct { Port int Protocol string } - +type k8sNameSelector struct { + nameSpaceSel string +} +type podCache struct { + labeSelector string + podNetwork string + podGroup string + podEpg string +} +type k8sIPBlockSelector struct { + subNetIps []string +} type k8sIngress struct { - IngressRules []k8sPolicyPorts - IngressPodSelector []*k8sPodSelector + IngressRules []k8sPolicyPorts + IngressPodSelector []*k8sPodSelector + IngressNameSelector *k8sNameSelector + IngressIpBlockSelector *k8sIPBlockSelector +} +type npPodInfo struct { + nameSpace string + labelSelectors []string + IP string //??? should care for ipv6 address } type k8sNetworkPolicy struct { PodSelector *k8sPodSelector Ingress []k8sIngress } +type labelPolicySpec struct { + policy []*k8sNetworkPolicy +} type k8sContext struct { - k8sClientSet *kubernetes.Clientset - contivClient *client.ContivClient - isLeader func() bool - networkPolicy map[string]k8sNetworkPolicy + k8sClientSet *kubernetes.Clientset + contivClient *client.ContivClient + isLeader func() bool + //Policy Obj per Policy Name + networkPolicy map[string]*k8sNetworkPolicy + //List of Rules Per Policy + policyRules map[string][]string + //List of Network configured + network map[string]string + //List of EPG configured + epgName map[string]string + //Default policy Per EPG + defaultPolicyPerEpg map[string]string + //List of Policy Per EPG + policyPerEpg map[string][]string + //Cache table for given Pods + //Policy Obj per Policy Name + nwPolicyPerNameSpace map[string][]*k8sNetworkPolicy } var npLog *log.Entry +func getNetworkInfo() string { + //XXX: Need expend this in version 2 + return defaultNetworkName +} +func getLabelDBkey(label, nameSpace string) string { + return label + nameSpace +} +func getEpgInfo() string { + //XXX: Need to expend in version 2 + return defaultEpgName +} +func getTenantInfo() string { + return defaultTenantName +} + +//Start Network Policy feature enabler func (k8sNet *k8sContext) handleK8sEvents() { for k8sNet.isLeader() != true { time.Sleep(time.Second * 10) @@ -70,10 +126,12 @@ func (k8sNet *k8sContext) handleK8sEvents() { } } +//Create network for given name string func (k8sNet *k8sContext) createNetwork(nwName string) error { npLog.Infof("create network %s", nwName) - - if _, err := k8sNet.contivClient.NetworkGet(defaultTenantName, nwName); err == nil { + if _, err := k8sNet.contivClient.NetworkGet( + defaultTenantName, + nwName); err == nil { return nil } @@ -88,18 +146,26 @@ func (k8sNet *k8sContext) createNetwork(nwName string) error { } for func() error { - _, err := k8sNet.contivClient.NetworkGet(defaultTenantName, nwName) + _, err := k8sNet.contivClient.NetworkGet( + defaultTenantName, + nwName) return err - }() != nil { //XXX:Should we really poll here ; there would be chances on genuine error and it cause infinity loop + }() != nil { + //XXX:Should we really poll here ; + //there would be chances on genuine error and + //it cause infinity loop time.Sleep(time.Millisecond * 100) } return nil } +//Delete given network from contiv system func (k8sNet *k8sContext) deleteNetwork(nwName string) error { npLog.Infof("delete network %s", nwName) - if _, err := k8sNet.contivClient.NetworkGet(defaultTenantName, nwName); err != nil { + if _, err := k8sNet.contivClient.NetworkGet( + defaultTenantName, + nwName); err != nil { return nil } @@ -110,7 +176,9 @@ func (k8sNet *k8sContext) deleteNetwork(nwName string) error { } for func() error { - _, err := k8sNet.contivClient.NetworkGet(defaultTenantName, nwName) + _, err := k8sNet.contivClient.NetworkGet( + defaultTenantName, + nwName) return err }() == nil { //XXX: Same here as above time.Sleep(time.Millisecond * 100) @@ -118,25 +186,26 @@ func (k8sNet *k8sContext) deleteNetwork(nwName string) error { return nil } -func (k8sNet *k8sContext) createEpg(nwName, epgName, policyName string) error { - npLog.Infof("create epg %s", epgName) - - if _, err := k8sNet.contivClient.EndpointGroupGet(defaultTenantName, epgName); err == nil { - return nil - } - +//Create EPG in context of given Network +func (k8sNet *k8sContext) createEpg( + nwName, + epgName string, + policy []string) error { + //npLog.Infof("create epg %s policy :%+v", epgName, policy) if err := k8sNet.contivClient.EndpointGroupPost(&client.EndpointGroup{ TenantName: defaultTenantName, NetworkName: nwName, GroupName: epgName, - Policies: []string{policyName}, + Policies: policy, }); err != nil { npLog.Errorf("failed to create epg %s, %s", epgName, err) return err } for func() error { - _, err := k8sNet.contivClient.EndpointGroupGet(defaultTenantName, epgName) + _, err := k8sNet.contivClient.EndpointGroupGet( + defaultTenantName, + epgName) return err }() != nil { //XXX: Same as above time.Sleep(time.Millisecond * 100) @@ -144,9 +213,32 @@ func (k8sNet *k8sContext) createEpg(nwName, epgName, policyName string) error { return nil } -func (k8sNet *k8sContext) deleteEpg(networkname, epgName, policyName string) error { +//Version 1 : Create default EPG at default-net network +func (k8sNet *k8sContext) createEpgInstance(nwName, epgName string) error { + var err error + policy := []string{defaultPolicyName} + if err = k8sNet.createDefaultPolicy( + defaultTenantName, + epgName); err != nil { + npLog.Errorf("failed Default ingress policy EPG %v: err:%v ", + epgName, err) + return err + } + if err = k8sNet.createEpg(nwName, epgName, policy); err != nil { + npLog.Errorf("failed to update EPG %v: err:%v ", epgName, err) + return err + } + k8sNet.epgName[epgName] = epgName + k8sNet.policyPerEpg[epgName] = policy + return err +} + +//Delete EPG from given network +func (k8sNet *k8sContext) deleteEpg(networkname, + epgName, policyName string) error { npLog.Infof("delete epg %s", epgName) - if _, err := k8sNet.contivClient.EndpointGroupGet(defaultTenantName, epgName); err != nil { + if _, err := k8sNet.contivClient. + EndpointGroupGet(defaultTenantName, epgName); err != nil { return nil } @@ -157,7 +249,8 @@ func (k8sNet *k8sContext) deleteEpg(networkname, epgName, policyName string) err } for func() error { - _, err := k8sNet.contivClient.EndpointGroupGet(defaultTenantName, epgName) + _, err := k8sNet.contivClient. + EndpointGroupGet(defaultTenantName, epgName) return err }() == nil { //Same as above time.Sleep(time.Millisecond * 100) @@ -165,15 +258,16 @@ func (k8sNet *k8sContext) deleteEpg(networkname, epgName, policyName string) err return nil } -func (k8sNet *k8sContext) createPolicy(tenantName string, epgName string) error { - policyName := k8sutils.EpgNameToPolicy(epgName) - - npLog.Infof("create policy: %s:%s", policyName, tenantName) - - if _, err := k8sNet.contivClient.PolicyGet(tenantName, policyName); err == nil { - return nil +//Create policy contiv system +func (k8sNet *k8sContext) createPolicy(tenantName string, + epgName, policyName string) error { + //policy := k8sutils.EpgNameToPolicy(epgName, policyName) + if _, err := k8sNet.contivClient. + PolicyGet(tenantName, policyName); err == nil { + npLog.Infof("Policy:%v found contiv", policyName) + return err } - + //npLog.Infof("Post policy:%v to ofnet", policyName) if err := k8sNet.contivClient.PolicyPost(&client.Policy{ TenantName: tenantName, PolicyName: policyName, @@ -191,34 +285,41 @@ func (k8sNet *k8sContext) createPolicy(tenantName string, epgName string) error return nil } +//Delete given policy from Contiv system func (k8sNet *k8sContext) deletePolicy(policyName string) error { npLog.Infof("delete policy %s", policyName) - if _, err := k8sNet.contivClient.PolicyGet(defaultTenantName, policyName); err != nil { + if _, err := k8sNet.contivClient. + PolicyGet(defaultTenantName, policyName); err != nil { return nil } - if err := k8sNet.contivClient.PolicyDelete(defaultTenantName, policyName); err != nil { + if err := k8sNet.contivClient. + PolicyDelete(defaultTenantName, policyName); err != nil { npLog.Errorf("failed to delete policy %s, %s", policyName, err) return err } for func() error { - _, err := k8sNet.contivClient.PolicyGet(defaultTenantName, policyName) + _, err := k8sNet.contivClient.PolicyGet(defaultTenantName, + policyName) return err - }() == nil { //XXX: Same as av + }() == nil { //XXX: time.Sleep(time.Millisecond * 100) } return nil } +//Post rule to contiv if not exist func (k8sNet *k8sContext) createRule(cRule *client.Rule) error { - npLog.Infof("create rule: %+v", *cRule) - if val, err := k8sNet.contivClient.RuleGet(cRule.TenantName, cRule.PolicyName, cRule.RuleID); err == nil { + if val, err := k8sNet.contivClient.RuleGet(cRule.TenantName, + cRule.PolicyName, cRule.RuleID); err == nil { if val.Action != cRule.Action { - k8sNet.deleteRule(cRule.TenantName, cRule.PolicyName, cRule.RuleID) + k8sNet.deleteRule(cRule.TenantName, + cRule.PolicyName, cRule.RuleID) } else { + npLog.Infof("Rule:%+v already exist", *cRule) return nil } } @@ -229,7 +330,8 @@ func (k8sNet *k8sContext) createRule(cRule *client.Rule) error { } for func() error { - _, err := k8sNet.contivClient.RuleGet(cRule.TenantName, cRule.PolicyName, cRule.RuleID) + _, err := k8sNet.contivClient.RuleGet(cRule.TenantName, + cRule.PolicyName, cRule.RuleID) return err }() != nil { time.Sleep(time.Millisecond * 100) @@ -237,128 +339,385 @@ func (k8sNet *k8sContext) createRule(cRule *client.Rule) error { return nil } -func (k8sNet *k8sContext) deleteRule(tenantName string, policyName, ruleID string) error { - npLog.Infof("delete rule: %s:%s", ruleID, policyName) +//Delete rule from contiv system +func (k8sNet *k8sContext) deleteRule(tenantName string, + policyName, ruleID string) error { + npLog.Infof("Delete rule: %s:%s", ruleID, policyName) - if _, err := k8sNet.contivClient.RuleGet(tenantName, policyName, ruleID); err != nil { + if _, err := k8sNet.contivClient. + RuleGet(tenantName, policyName, ruleID); err != nil { return nil } - if err := k8sNet.contivClient.RuleDelete(tenantName, policyName, ruleID); err != nil { - npLog.Errorf("failed to delete rule: %s:%s, %v", ruleID, policyName, err) + if err := k8sNet.contivClient. + RuleDelete(tenantName, policyName, ruleID); err != nil { + npLog.Errorf("Failure rule del Ops:%s:%s,%v", + ruleID, policyName, err) return err } for func() error { - _, err := k8sNet.contivClient.RuleGet(tenantName, policyName, ruleID) + _, err := k8sNet.contivClient. + RuleGet(tenantName, policyName, ruleID) return err }() == nil { time.Sleep(time.Millisecond * 100) } return nil } -func (k8sNet *k8sContext) getIsolationPolicy(annotations map[string]string) string { - var inPolicy struct { - Ingress map[string]string `json:"ingress"` - } - if inByte, ok := annotations["net.beta.kubernetes.io/network-policy"]; ok { - if err := json.Unmarshal([]byte(inByte), &inPolicy); err != nil { - npLog.Infof("no isolation policy in namespace [%s], %s", inByte, err) - return "allow" - } - } else { - return "none" +//Sub handler to process Network Policy event from K8s srv +func (k8sNet *k8sContext) processK8sNetworkPolicy( + opCode watch.EventType, np *network_v1.NetworkPolicy) { + if np.Namespace == "kube-system" { //not applicable for system namespace + return } - if policy, ok := inPolicy.Ingress["isolation"]; ok { - if strings.ToLower(policy) == strings.ToLower("DefaultDeny") { - return "deny" - } + npLog.Infof("Network Policy[%v]: %+v", opCode, *np) + switch opCode { + case watch.Added, watch.Modified: + k8sNet.addNetworkPolicy(np) + + case watch.Deleted: + k8sNet.delNetworkPolicy(np) } - return "allow" } -func (k8sNet *k8sContext) updateDefaultIngressPolicy(ns string, action string) { - nwName := ns + "-" + defaultNetworkName - policyName := ns + "-" + defaultPolicyName - epgName := ns + "-" + defaultEpgName - - var err error - - if err = k8sNet.createNetwork(nwName); err != nil { - npLog.Errorf("failed to update network %s, %s", nwName, err) +//Sub Handler to process Pods events from K8s srv +func (k8sNet *k8sContext) processK8sPods(opCode watch.EventType, pod *v1.Pod) { + if pod.Namespace == "kube-system" { //not applicable for system namespace return } + //K8s pods event doesn't provide Ips information in Add/delete type + //opcode + npLog.Infof("POD [%s],NameSpace[%v] ,Label:[%+v],IPs:[%v]", + opCode, pod.ObjectMeta.Namespace, + pod.ObjectMeta.Labels, pod.Status.PodIP) - if err = k8sNet.createPolicy(defaultTenantName, policyName); err != nil { - npLog.Errorf("failed to update policy %s, %s", policyName, err) + if _, ok := k8sNet. + nwPolicyPerNameSpace[pod.ObjectMeta.Namespace]; !ok { + npLog.Infof("Pod doesn't match policy namespace") return } + switch opCode { + case watch.Added, watch.Modified, watch.Deleted: + if pod.ObjectMeta.DeletionTimestamp != nil && + len(pod.Status.PodIP) > 0 { + //K8s Srv notify Pods delete case as part of modify + //event by specifying DeletionTimeStamp + // Pod Delete event doesn't carry Pod Ips info + //therefore Using Modify event to manipulate future + //delete event + k8sNet.processPodDeleteEvent(pod) + } else if len(pod.Status.PodIP) > 0 { + //Pod event without timeDeletion with Pod ip consider + //as pod add event + k8sNet.processPodAddEvent(pod) + } - if err = k8sNet.createEpg(nwName, epgName, policyName); err != nil { - npLog.Errorf("failed to update EPG %s, %s", epgName, err) - return } +} - if err = k8sNet.createRule(&client.Rule{ - TenantName: defaultTenantName, - PolicyName: policyName, - RuleID: defaultRuleID, - Priority: defaultPolicyPriority, - Direction: "in", - Action: "allow"}); err != nil { - npLog.Errorf("failed to update default rule, %s", err) - return +//Parse Pod Info from receive Pod events +func parsePodInfo(pod *v1.Pod) npPodInfo { + var pInfo npPodInfo + pInfo.nameSpace = pod.ObjectMeta.Namespace + for key, val := range pod.ObjectMeta.Labels { + pInfo.labelSelectors = + append(pInfo.labelSelectors, (key + "=" + val)) } + pInfo.IP = pod.Status.PodIP + return pInfo } -func (k8sNet *k8sContext) deleteDefaultIngressPolicy(ns string) { - nwName := ns + "-" + defaultNetworkName - policyName := ns + "-" + defaultPolicyName - epgName := ns + "-" + defaultEpgName +//Get Network Policy object sets which ToSpec labelMap information match +//with given pods labelMap +func (k8sNet *k8sContext) getMatchToSpecPartNetPolicy( + podInfo npPodInfo) []*k8sNetworkPolicy { + var toPartPolicy []*k8sNetworkPolicy + nwPolicy, ok := k8sNet.nwPolicyPerNameSpace[podInfo.nameSpace] + if !ok { + npLog.Warnf("No NetworkPolicy for NameSpace:%v", + podInfo.nameSpace) + return nil + } + for _, nwPol := range nwPolicy { + for _, label := range podInfo.labelSelectors { + //Collect networkPolicy object which match with pods + //Labels + if _, ok := nwPol.PodSelector.labelPodMap[label]; ok { + toPartPolicy = append(toPartPolicy, nwPol) + // npLog.Infof("policy :%+v", nwPol) + break + } + } + } + return toPartPolicy +} - var err error +//Get Network Policy object sets which FromSpec, labelMap information match +//with given pods labelMap +func (k8sNet *k8sContext) getMatchFromSpecPartNetPolicy( + podInfo npPodInfo) []*k8sNetworkPolicy { - if err = k8sNet.deleteRule(defaultTenantName, policyName, defaultRuleID); err != nil { - npLog.Errorf("failed to delete default rule, %s", err) - return + var fromPartPolicy []*k8sNetworkPolicy + //NetworkPolicy master object on pods Namespace + nwPolicy, ok := k8sNet.nwPolicyPerNameSpace[podInfo.nameSpace] + if !ok { + npLog.Infof("Pod namespace doesn't have any policy config") + return nil } + //Build list of networkPolicy object which fromSpec belongs to given + //pods Info + for _, l := range podInfo.labelSelectors { + for _, nwPol := range nwPolicy { + for _, ingress := range nwPol.Ingress { + //PodSelector on FromSpec part of policy Object + for _, podSelector := range ingress. + IngressPodSelector { + npLog.Infof("labelMap:%+v", + podSelector.labelPodMap) + if _, ok := + podSelector.labelPodMap[l]; ok { + fromPartPolicy = + append(fromPartPolicy, + nwPol) + break + } + } + } + } + } + return fromPartPolicy +} - if err = k8sNet.deleteEpg(nwName, epgName, policyName); err != nil { - npLog.Errorf("failed to delete EPG %s, %s", epgName, err) +//Consolidate all Ips belongs to Label for Pod Selector object +func (k8sNet *k8sContext) updatePodSelectorPodIps( + podSelector *k8sPodSelector) { + if podSelector == nil { + npLog.Errorf(" nil pod Selector reference") return } - if err = k8sNet.deletePolicy(policyName); err != nil { - npLog.Errorf("failed to delete policy %s, %s", policyName, err) - return + for _, ipMap := range podSelector.labelPodMap { + for ip := range ipMap { + podSelector.podIps[ip] = ip + } } + return } -func (k8sNet *k8sContext) processK8sNetworkPolicy(opCode watch.EventType, np *v1.NetworkPolicy) { - if np.Namespace == "kube-system" { // not applicable for system namespace - return +//Process Pod Delete Event from K8s Srv +func (k8sNet *k8sContext) processPodDeleteEvent(pod *v1.Pod) { + podInfo := parsePodInfo(pod) + labelList := podInfo.labelSelectors + rmIps := []string{podInfo.IP} + npLog.Infof("POD [Delete] for pods:%+v", rmIps) + //find All configured Network Policy object which given pods LableMap + //match + + toSpecNetPolicy := k8sNet.getMatchToSpecPartNetPolicy(podInfo) + if len(toSpecNetPolicy) > 0 { + for _, nw := range toSpecNetPolicy { + //remove Given Pods Ips from List + delete(nw.PodSelector.podIps, podInfo.IP) + //revisit all configur label Ips list in pod + //Selector object + k8sNet.getIpListMatchPodSelector(nw.PodSelector, + labelList, podInfo.IP) + //Remove Pods Ips from Label Map Table + //Update PodSelector podIps list + k8sNet.updatePodSelectorPodIps(nw.PodSelector) + + rList := k8sNet. + buildRulesFromIngressSpec(nw, + nw.PodSelector.PolicyName) + npLog.Infof("remove Pods ToSpec :%+v", rmIps) + ruleList := k8sNet. + finalIngressNetworkPolicyRule( + nw, rmIps, *rList, false) + npLog.Infof("Delete To Spec rule:%+v", ruleList) + npLog.Infof("rmove PodIps from ToSpec", podInfo.IP) + } + } else { //Pods belongs fromPart of Spec + fromPartPolicy := k8sNet.getMatchFromSpecPartNetPolicy(podInfo) + npLog.Infof("Delete Pod belong FromSec part:%+v", + fromPartPolicy) + if len(fromPartPolicy) > 0 { + npLog.Infof("remove PodIps:%v fromSpec part of Policy", + rmIps) + for _, nw := range fromPartPolicy { + k8sNet.rmIpFromSpecPodSelector(nw, + labelList, podInfo.IP) + rList := k8sNet. + buildIngressRuleToPodSelector(nw, rmIps, + nw.PodSelector.PolicyName) + npLog.Infof("Ingress Rule :%+v", *rList) + npLog.Infof("Pods Info in To Spec :%+v", rmIps) + ipList := getIpMapToSlice(nw.PodSelector.podIps) + ruleList := k8sNet. + finalIngressNetworkPolicyRule(nw, + ipList, *rList, false) + npLog.Infof("Pod rules:%+v", ruleList) + } + } + } +} +func getIpMapToSlice(m map[string]string) []string { + ips := []string{} + for ip := range m { + ips = append(ips, ip) + } + return ips +} +func (k8sNet *k8sContext) UpdateIpListFromSpecfromLabel(nw *k8sNetworkPolicy, + label []string, ip string) { + for _, ingress := range nw.Ingress { + for _, podSelector := range ingress.IngressPodSelector { + for _, l := range label { + if ipMap, ok := + podSelector.labelPodMap[l]; ok { + ipMap[ip] = true + } + } + //Rebuild PodSelector PodIps + k8sNet.updatePodSelectorPodIps(podSelector) + npLog.Infof("Update PodIps into FromSpecPod:%+v", + podSelector) + } } + return +} - npLog.Infof("process [%s] network policy %+v", opCode, np) +//Remove Give Pod Ips fromSpec Object of Network Policy +func (k8sNet *k8sContext) rmIpFromSpecPodSelector( + nw *k8sNetworkPolicy, label []string, ip string) { + for _, ingress := range nw.Ingress { + for _, podSelector := range ingress.IngressPodSelector { + npLog.Infof("podSelector:%+v", podSelector) + //remove ips from PodSelector Object + delete(podSelector.podIps, ip) + for _, l := range label { + if ipMap, ok := + podSelector.labelPodMap[l]; ok { + delete(ipMap, ip) + npLog.Infof("remove Pod Ips:%v FromSpec map:%+v", + ip, ipMap) + } + } + //Rebuild PodSelector PodIps + k8sNet.updatePodSelectorPodIps(podSelector) + } + } + return +} - switch opCode { - case watch.Added: - npLog.Infof("Recv [%s] network policy event", opCode, np) - k8sNet.addNetworkPolicy(np) - case watch.Modified: - case watch.Deleted: +//Add Pods Ips and readjuct Pod selector podIps list +func (k8sNet *k8sContext) addPodIpsToSpecPodSelector(nw *k8sNetworkPolicy, + label []string, ip string) { + for _, l := range label { + if ipMap, ok := nw.PodSelector.labelPodMap[l]; ok { + ipMap[ip] = true + } } + //Recalculate PodSelector PodIps list + for _, lMap := range nw.PodSelector.labelPodMap { + //At each Label Walk all its Ips + for ip := range lMap { + nw.PodSelector.podIps[ip] = ip + } + } + return +} + +//return list of Ips which belongs to given lable in PodSelector Object +func (k8sNet *k8sContext) getIpListMatchPodSelector(podSelector *k8sPodSelector, + label []string, ip string) { + for _, l := range label { + if ipMap, ok := podSelector.labelPodMap[l]; ok { + ipMap[ip] = true + } + } + return } -func (k8sNet *k8sContext) processK8sEvent(opCode watch.EventType, eventObj interface{}) { +//Process Pod Add event +func (k8sNet *k8sContext) processPodAddEvent(pod *v1.Pod) { + if pod.Status.PodIP == "" { + return + } + //Get Pods Ips and its Pod selector label + podInfo := parsePodInfo(pod) + npLog.Infof("POD [ADD] request for pod %+v", podInfo) + //get programmed NetworkPolicy for recv Pod Namespace + //find All configured Policy which is having given pods Label selector + // is part of To spec + toPartPolicy := k8sNet.getMatchToSpecPartNetPolicy(podInfo) + npLog.Infof("ToPartSpec:%+v", toPartPolicy) + //Pods Belongs to To Spec part + podIps := []string{podInfo.IP} + + if len(toPartPolicy) > 0 { + npLog.Infof("Recv Pod belongs to ToSpec part of Policy") + for _, nw := range toPartPolicy { + rList := k8sNet.buildRulesFromIngressSpec(nw, + nw.PodSelector.PolicyName) + if len(*rList) > 0 { + npLog.Infof("Pods Info in To Spec :%+v", + nw.PodSelector.podIps) + if _, ok := nw.PodSelector.podIps[podInfo.IP]; ok { + npLog.Infof("pod Ips already exist", podInfo.IP) + continue + } + ruleList := k8sNet.finalIngressNetworkPolicyRule( + nw, podIps, *rList, true) + npLog.Infof("To Spec Pod rules:%+v", ruleList) + npLog.Infof("podInf.labelSelectors:%+v", + podInfo.labelSelectors) + } + k8sNet.addPodIpsToSpecPodSelector(nw, + podInfo.labelSelectors, podInfo.IP) + } + } else { + //Build fromPodSelector List + fromPartPolicy := k8sNet.getMatchFromSpecPartNetPolicy(podInfo) + //Build Rules and update to OVS + for _, nw := range fromPartPolicy { + npLog.Infof("fromPartPolicy:%+v", *nw) + rList := k8sNet.buildIngressRuleToPodSelector(nw, + podIps, + nw.PodSelector.PolicyName) + if len(*rList) > 0 { + npLog.Infof("Ingress Rule :%+v", *rList) + npLog.Infof("Pods Info in To Spec :%+v", + nw.PodSelector.podIps) + ipList := getIpMapToSlice(nw.PodSelector.podIps) + ruleList := k8sNet.finalIngressNetworkPolicyRule(nw, + ipList, *rList, true) + npLog.Infof("Pod rules:%+v", ruleList) + } + k8sNet.UpdateIpListFromSpecfromLabel(nw, + podInfo.labelSelectors, podInfo.IP) + } + } +} + +//Handler to process APIs Server Watch event +func (k8sNet *k8sContext) processK8sEvent(opCode watch.EventType, + eventObj interface{}) { + //Only Leader will process events if k8sNet.isLeader() != true { return } + switch objType := eventObj.(type) { - case *v1.NetworkPolicy: + case *v1.Pod: + k8sNet.processK8sPods(opCode, objType) + case *network_v1.NetworkPolicy: k8sNet.processK8sNetworkPolicy(opCode, objType) + default: + npLog.Infof("Unwanted event from K8s evType:%v objType:%v", + opCode, objType) } } @@ -369,7 +728,7 @@ func (k8sNet *k8sContext) watchK8sEvents(errChan chan error) { for k8sNet.isLeader() != true { time.Sleep(time.Millisecond * 100) } - + //Set Watcher for Network Policy resource npWatch, err := k8sNet.k8sClientSet.Networking().NetworkPolicies("").Watch(meta_v1.ListOptions{}) if err != nil { errChan <- fmt.Errorf("failed to watch network policy, %s", err) @@ -378,7 +737,12 @@ func (k8sNet *k8sContext) watchK8sEvents(errChan chan error) { selCase = append(selCase, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(npWatch.ResultChan())}) + //Set watcher for Pods resource + podWatch, _ := k8sNet.k8sClientSet.CoreV1(). + Pods("").Watch(meta_v1.ListOptions{}) + selCase = append(selCase, reflect.SelectCase{Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(podWatch.ResultChan())}) for { _, recVal, ok := reflect.Select(selCase) if !ok { @@ -417,181 +781,368 @@ func InitK8SServiceWatch(listenURL string, isLeader func() bool) error { npLog.Fatalf("failed to init K8S client, %v", err) return err } - kubeNet := k8sContext{contivClient: contivClient, k8sClientSet: k8sClientSet, isLeader: isLeader} + //nwoPolicyDb := make(map[string]k8sNetworkPolicy, 0) + kubeNet := k8sContext{ + contivClient: contivClient, + k8sClientSet: k8sClientSet, + isLeader: isLeader, + networkPolicy: make(map[string]*k8sNetworkPolicy, 0), + //lookup table for Configured Network; + network: make(map[string]string, 0), + //lookup table for Configured Policy per EPG + defaultPolicyPerEpg: make(map[string]string, 0), + epgName: make(map[string]string, 0), + policyPerEpg: make(map[string][]string, 0), + policyRules: make(map[string][]string, 0), + nwPolicyPerNameSpace: make(map[string][]*k8sNetworkPolicy, 0), + } + + //Trigger default epg : = default-group + kubeNet.createEpgInstance(defaultNetworkName, defaultEpgName) go kubeNet.handleK8sEvents() return nil } +func getLabelSelector(key, val string) string { + return (key + "=" + val) +} -func (k8sNet *k8sContext) addNetworkPolicy(np *v1beta1.NetworkPolicy) { - //Get all pods which belongs to given label selector - npPodSelector, err := k8sNet.getPodSelector(np.Spec.PodSelector.MatchLabels) +func (k8sNet *k8sContext) addNetworkPolicy(np *network_v1.NetworkPolicy) { + //check if given Policy already exist + if _, ok := k8sNet.networkPolicy[np.Name]; ok { + npLog.Warnf("Delete existing network policy: %s !", np.Name) + k8sNet.delNetworkPolicy(np) + } + //build ToSpec PodSelector Obj + npPodSelector, err := k8sNet.parsePodSelectorInfo( + np.Spec.PodSelector.MatchLabels, + np.Namespace) if err != nil { npLog.Warnf("ignore network policy: %s, %v", np.Name, err) return } - npLog.Infof("network policy [%s] pod-selector: %+v", np.Name, npPodSelector) - IngressRules, err := k8sNet.getIngressPolicy(np.Spec.Ingress) + //Set policy name ToSpec podSelector Obj + npPodSelector.PolicyName = np.Name + //Save recv Label map info + npLog.Infof("Network policy [%s] pod-selector: %+v", + np.Name, npPodSelector) + //npLog.Infof("Recv [%s] Ingress_Policy from K8s: %+v", + // np.Name, np.Spec.Ingress) + + //Parse Ingress Policy rules + IngressRules, err := + k8sNet.parseIngressPolicy(np.Spec.Ingress, + np.Namespace) if err != nil { npLog.Warnf("ignore network policy: %s, %v", np.Name, err) return } - npLog.Infof("network Policy [%s] IngressPolicy: %+v", np.Name, IngressRules) - if _, ok := k8sNet.networkPolicy[np.Name]; ok { - npLog.Warnf("delete existing network policy: %s !", np.Name) - k8sNet.deleteNetworkPolicy(np) - } + nwPolicy := k8sNetworkPolicy{ + PodSelector: npPodSelector, + Ingress: IngressRules} - nwPolicy := k8sNetworkPolicy{PodSelector: npPodSelector, Ingress: IngressRules} - - npLog.Info("Going to Send Network Policy %+v", nwPolicy) + npLog.Info("Apply Network Policy[%s]: %+v, %+v", np.Name, + nwPolicy.PodSelector, nwPolicy.Ingress) + //Push policy info to ofnet agent if err := k8sNet.applyContivNetworkPolicy(&nwPolicy); err != nil { - npLog.Errorf("[%s] failed to configure policy, %v", np.Name, err) + npLog.Errorf("[%s] failed to configure policy, %v", + np.Name, err) return } + //Cache configued NetworkPolicy obj using policy Name + k8sNet.networkPolicy[np.Name] = &nwPolicy + //cache networkPolicy obj per namespace + k8sNet.nwPolicyPerNameSpace[np.Namespace] = + append(k8sNet.nwPolicyPerNameSpace[np.Name], &nwPolicy) + npLog.Infof("Add network policy in per NameSpace:%v", + k8sNet.nwPolicyPerNameSpace[np.Namespace]) +} + +//Build partial rule list using FromSpec PodSelector information +func (k8sNet *k8sContext) buildRulesFromIngressSpec( + np *k8sNetworkPolicy, + policyName string) (lRules *[]client.Rule) { + var listRules []client.Rule + for _, ingress := range np.Ingress { + isPortsCfg := false + if len(ingress.IngressRules) > 0 { + isPortsCfg = true + //Is Port Cfg included into From Ingress Spec + } + for _, podSec := range ingress.IngressPodSelector { + for _, fromIp := range podSec.podIps { + rule := client.Rule{ + TenantName: np.PodSelector.TenantName, + PolicyName: np.PodSelector.PolicyName, + FromIpAddress: fromIp, + Priority: defaultPolicyPriority, + Direction: "in", + Action: "allow"} + //If Port cfg enable + if isPortsCfg { + for _, p := range ingress.IngressRules { + k8sNet.appendPolicyPorts(&rule, p) + listRules = append(listRules, rule) + } + } else { + listRules = append(listRules, rule) + } + } + } + } + return &listRules +} - k8sNet.networkPolicy[np.Name] = nwPolicy +//Build Partial Rules based on FromSpec Pod IPs list +func (k8sNet *k8sContext) buildIngressRuleToPodSelector( + np *k8sNetworkPolicy, //Network Policy object + from []string, //FromSpec Ips List + policyName string) (lRules *[]client.Rule) { + //npLog.Infof("From:%v", from) + var listRules []client.Rule + //Walk info Ingress Policy FromSpec PodSelector + for _, ingress := range np.Ingress { + isPortsCfg := false + if len(ingress.IngressRules) > 0 { + isPortsCfg = true + //Is Port Cfg included into From Ingress Spec + } + //Attach All fromSpec IpsList + for _, fromIp := range from { + rule := client.Rule{ + TenantName: np.PodSelector.TenantName, + PolicyName: np.PodSelector.PolicyName, + FromIpAddress: fromIp, + Priority: defaultPolicyPriority, + Direction: "in", + Action: "allow"} + if isPortsCfg { + for _, port := range ingress.IngressRules { + //Add port Info into Rule + k8sNet.appendPolicyPorts(&rule, port) + listRules = append(listRules, rule) + } + } else { + listRules = append(listRules, rule) + } + } + } + return &listRules } -func (k8sNet *k8sContext) applyContivNetworkPolicy(np *k8sNetworkPolicy) error { - var err error +//final Build Rules by linking from spec to To spec podSelector +func (k8sNet *k8sContext) finalIngressNetworkPolicyRule(np *k8sNetworkPolicy, + toPodIPs []string, + ingressRules []client.Rule, + isAdd bool) *[]client.Rule { - // don't configure from multiple masters - if k8sNet.isLeader() != true { - return err + var err error + ruleList := ingressRules + + policyCtx := k8sNet.policyRules[np.PodSelector.PolicyName] + + //Ingress Spec To section Pods + for _, toIps := range toPodIPs { + //npLog.Infof("ruleList:%v", ruleList) + //Rebuild Rule List to add To Ips + for _, rule := range ruleList { + //Update To src Ip section in Rule + rule.ToIpAddress = toIps + //Generate RuleID :XXX: Should look for better approach + rule.RuleID = k8sutils.PolicyToRuleIDUsingIps( + toIps, rule.FromIpAddress, + rule.Port, rule.Protocol, + np.PodSelector.PolicyName) + + npLog.Infof("RulID:%v", rule.RuleID) + //Update Policy Name cache with policy Id + if isAdd { + if err = k8sNet.createRule(&rule); err != nil { + npLog.Errorf("failed: rules in-policy %+v, %v", + np.PodSelector, err) + return nil + } + //Update RuleID in cache Db + //XXX:Should use Hash Set instead of slice + // to aviod duplicate Ruleid insertion + policyCtx = append(policyCtx, rule.RuleID) + } else { //Policy Delete + if err = k8sNet.deleteRule(rule.TenantName, + rule.PolicyName, + rule.RuleID); err != nil { + npLog.Errorf("failed: del in-policy %+v, %v", + np.PodSelector, err) + return nil + } + for idx, r := range policyCtx { + if r == rule.RuleID { + policyCtx = append(policyCtx[0:idx], + policyCtx[idx+1:]...) + } + } + } + } + } + if len(policyCtx) > 0 { + k8sNet.policyRules[np.PodSelector.PolicyName] = policyCtx + } else { + delete(k8sNet.policyRules, np.PodSelector.PolicyName) } + return &ruleList +} +//Build policy ,rules and attached it to EPG +func (k8sNet *k8sContext) applyContivNetworkPolicy(np *k8sNetworkPolicy) error { + var err error // reset policy to deny on any error policyResetOnErr := func(tenantName, groupName string) { if err != nil { //k8sNet.resetPolicy(tenantName, groupName) + npLog.Warnf("Need to reset the policy") } } - - // policy - if err = k8sNet.createDefaultPolicy(np.PodSelector.TenantName, np.PodSelector.GroupName); err != nil { - npLog.Errorf("failed to create policy %+v, %v", np.PodSelector, err) - return err + //Check if EPG already created or not + if _, ok := k8sNet.epgName[np.PodSelector.GroupName]; !ok { + //Create EPG then + npLog.Infof("EPG :%v doesn't exist create now", + np.PodSelector.GroupName) + if err = k8sNet.createEpgInstance( + np.PodSelector.NetworkName, + np.PodSelector.GroupName); err != nil { + npLog.Errorf("failed to create EPG %s ", err) + return err + } + } + //Existing policy in EPG + attachPolicy := k8sNet.policyPerEpg[np.PodSelector.GroupName] + //Check if Policy is already programmed in EPG or not + if _, ok := k8sNet.networkPolicy[np.PodSelector.PolicyName]; !ok { + //Create Policy and published to ofnet controller + if err = k8sNet.createPolicy( + defaultTenantName, + np.PodSelector.GroupName, + np.PodSelector.PolicyName); err != nil { + npLog.Errorf("Failed to create Policy :%v", + np.PodSelector.PolicyName) + return err + } + k8sNet.networkPolicy[np.PodSelector.PolicyName] = np + + attachPolicy = append(attachPolicy, np.PodSelector.PolicyName) + //Update EPG with New Policy + if err = k8sNet.createEpg( + np.PodSelector.NetworkName, + np.PodSelector.GroupName, attachPolicy); err != nil { + npLog.Errorf("failed to update EPG %s ", err) + return err + } + //Attach new Policy into existing EPG + k8sNet.policyPerEpg[np.PodSelector.GroupName] = attachPolicy + } else { + //XXX: Need check if policy rules are still same or not + npLog.Warnf("Policy:%v already exist ", + np.PodSelector.PolicyName) } - defer policyResetOnErr(np.PodSelector.TenantName, np.PodSelector.GroupName) + defer policyResetOnErr( + np.PodSelector.TenantName, + np.PodSelector.GroupName) - // src epg - if _, err := k8sNet.contivClient.EndpointGroupGet(np.PodSelector.TenantName, - np.PodSelector.GroupName); err != nil { - npLog.Infof("epg: %+v doesn't exist", np.PodSelector) - return nil - } - npLog.Info("Got resp from Contiv EndPoint %+v", np.PodSelector) + //Build Ingress rules list + rList := k8sNet.buildRulesFromIngressSpec(np, + np.PodSelector.PolicyName) + npLog.Infof("Build Rules Ingress Spec:%+v, rList:%+v", + np.Ingress, rList) - // Add epg and rules - for _, ingress := range np.Ingress { - for _, from := range ingress.IngressPodSelector { - // from/to epgs - if _, err := k8sNet.contivClient.EndpointGroupGet(from.TenantName, from.GroupName); err != nil { - npLog.Infof("epg: %+v doesn't exist", from) - return nil - } - npLog.Info("From EndPoint xxContiv EndPoint %+v", from) - - // rules - for _, port := range ingress.IngressRules { - npLog.Infof("configure contiv policy: %+v", port) - for _, FromIP := range from.PodIps { - for _, ToIP := range np.PodSelector.PodIps { - ruleId := k8sutils.PolicyToRuleID(from.GroupName, port.Protocol, port.Port, "in") - if err = k8sNet.createRule(&client.Rule{ - TenantName: np.PodSelector.TenantName, - PolicyName: k8sutils.EpgNameToPolicy(np.PodSelector.GroupName), - FromIpAddress: FromIP, - ToIpAddress: ToIP, - RuleID: ruleId, - Protocol: strings.ToLower(port.Protocol), - Priority: defaultPolicyPriority, - Port: port.Port, - Direction: "in", - Action: "allow"}); err != nil { - npLog.Errorf("failed to create rules in in-policy %+v, %v", np.PodSelector, err) - return err - } - } - } - } - } - } + ipList := getIpMapToSlice(np.PodSelector.podIps) + + npLog.Infof("Pods Info in To Spec :%+v", np.PodSelector.podIps) + ruleList := k8sNet.finalIngressNetworkPolicyRule(np, ipList, + *rList, true) + npLog.Infof("final rules :%v", ruleList) + policyCtx := k8sNet.policyRules[np.PodSelector.PolicyName] + npLog.Infof("All rules:%+v", policyCtx) return nil } -func (k8sNet *k8sContext) deleteNetworkPolicy(np *v1beta1.NetworkPolicy) { +func (k8sNet *k8sContext) appendPolicyPorts(rules *client.Rule, ports k8sPolicyPorts) { + if len(ports.Protocol) > 0 { + rules.Protocol = strings.ToLower(ports.Protocol) + } + if ports.Port != 0 { + rules.Port = ports.Port + } + return +} +func (k8sNet *k8sContext) createUpdateRuleIds(rules *client.Rule) string { + ruleID := rules.FromIpAddress + rules.ToIpAddress + if rules.Port != 0 { + ruleID += strconv.Itoa(rules.Port) + } + if len(rules.Protocol) > 0 { + ruleID += rules.Protocol + } + rules.RuleID = ruleID + return ruleID +} +func (k8sNet *k8sContext) delNetworkPolicy(np *network_v1.NetworkPolicy) { npLog.Infof("delete network policy: %s", np.Name) policy, ok := k8sNet.networkPolicy[np.Name] if !ok { - npLog.Errorf("network policy: %s is not found", np.Name) + npLog.Errorf("network policy: %s doesn't exist", np.Name) return } - if err := k8sNet.cleanupContivNetworkPolicy(&policy); err != nil { + if err := k8sNet.cleanupContivNetworkPolicy(policy); err != nil { npLog.Errorf("failed to delete network policy: %s, %v", np.Name, err) return } //Remove PolicyId from Policy Db delete(k8sNet.networkPolicy, np.Name) - } func (k8sNet *k8sContext) cleanupContivNetworkPolicy(np *k8sNetworkPolicy) error { var retErr error - - // don't configure from multiple masters - if k8sNet.isLeader() != true { - return nil + policyName := np.PodSelector.PolicyName + npLog.Infof("Cleanup policy rule req:%+v and rules:%v", policyName, k8sNet.policyRules) + for _, ruleID := range k8sNet.policyRules[policyName] { //Walk for all configured Rules + npLog.Infof("Cleanup RulID:%v from policy:%v", ruleID, policyName) + if err := k8sNet.deleteRule(np.PodSelector.TenantName, policyName, ruleID); err != nil { + npLog.Warnf("failed to delete policy: %s rule: %s, %v", + policyName, ruleID, err) + retErr = err + } } - policyName := k8sutils.EpgNameToPolicy(np.PodSelector.GroupName) - for _, ingress := range np.Ingress { - for _, from := range ingress.IngressPodSelector { - for _, port := range ingress.IngressRules { - for _, direction := range []string{"in", "out"} { - ruleID := k8sutils.PolicyToRuleID(from.GroupName, port.Protocol, - port.Port, direction) - policyName := k8sutils.EpgNameToPolicy(np.PodSelector.GroupName) - - if err := k8sNet.deleteRule(np.PodSelector.TenantName, policyName, ruleID); err != nil { - npLog.Warnf("failed to delete policy: %s rule: %s, %v", - policyName, ruleID, err) - retErr = err - // try deleting other config - } - } - } - if err := k8sNet.deleteEpg(from.TenantName, from.GroupName, policyName); err != nil { - npLog.Warnf("failed to delete epg: %+v", from) - retErr = err - } else { - if err := k8sNet.deletePolicy(policyName); err != nil { - npLog.Warnf("failed to delete policy: %s:%s", from.TenantName, from.GroupName) - retErr = err - } - } + policyPerEpg := k8sNet.policyPerEpg[np.PodSelector.GroupName] + for index, policy := range policyPerEpg { + if policy == policyName { + policyPerEpg = append(policyPerEpg[:index], + policyPerEpg[index+1:]...) + break } } - - // delete pod selector epg - if err := k8sNet.deleteEpg(np.PodSelector.TenantName, np.PodSelector.GroupName, policyName); err != nil { - npLog.Warnf("failed to delete epg: %+v", np.PodSelector) + k8sNet.policyPerEpg[np.PodSelector.GroupName] = policyPerEpg + //Unlink Policy From EPG + if err := k8sNet.createEpg(np.PodSelector.NetworkName, + np.PodSelector.GroupName, policyPerEpg); err != nil { + npLog.Errorf("failed to update EPG %s, %s", + np.PodSelector.GroupName, err) + retErr = err + } + //Delete Policy + if err := k8sNet.deletePolicy(policyName); err != nil { + npLog.Warnf("failed to delete policy: %s", + np.PodSelector.TenantName) retErr = err - } else { - if err := k8sNet.deletePolicy(policyName); err != nil { - npLog.Warnf("failed to delete policy: %s", np.PodSelector) - retErr = err - } } - return retErr } -func (k8sNet *k8sContext) getPolicyPorts(policyPort []v1beta1.NetworkPolicyPort) []k8sPolicyPorts { +//parse policy Ports information +func (k8sNet *k8sContext) getPolicyPorts( + policyPort []network_v1.NetworkPolicyPort) []k8sPolicyPorts { rules := []k8sPolicyPorts{} for _, pol := range policyPort { @@ -601,109 +1152,241 @@ func (k8sNet *k8sContext) getPolicyPorts(policyPort []v1beta1.NetworkPolicyPort) if pol.Port != nil { port = pol.Port.IntValue() } - if pol.Protocol != nil { protocol = string(*pol.Protocol) } - - npLog.Infof("ingress policy port: protocol: %v, port: %v", protocol, port) - rules = append(rules, k8sPolicyPorts{Port: port, Protocol: protocol}) + npLog.Infof("ingress policy port: protocol: %v, port: %v", + protocol, port) + rules = append(rules, + k8sPolicyPorts{Port: port, + Protocol: protocol}) } return rules } -func (k8sNet *k8sContext) getIngressPodSelectorList(peers []v1beta1.NetworkPolicyPeer) ([]*k8sPodSelector, error) { +func (k8sNet *k8sContext) getIngressPodSelectorList( + peers []network_v1.NetworkPolicyPeer, + nameSpace string) ([]*k8sPodSelector, error) { peerPodSelector := []*k8sPodSelector{} + npLog.Infof("Recv PeerInfo:= %+v", peers) + if len(peers) <= 0 { return peerPodSelector, fmt.Errorf("empty pod selectors") } for _, from := range peers { if from.PodSelector != nil { - s, err := k8sNet.getPodSelector(from.PodSelector.MatchLabels) + s, err := k8sNet.parsePodSelectorInfo( + from.PodSelector.MatchLabels, nameSpace) // don't apply partial policy. if err != nil { return []*k8sPodSelector{}, err } - npLog.Infof("ingress policy pod-selector: %+v", s) + npLog.Infof("Ingress policy pod-selector: %+v", s) peerPodSelector = append(peerPodSelector, s) } } return peerPodSelector, nil } -func (k8sNet *k8sContext) getIngressPolicy(npIngress []v1beta1.NetworkPolicyIngressRule) ([]k8sIngress, error) { - ingressRules := []k8sIngress{} +//Build Ingress Policy obj +func (k8sNet *k8sContext) parseIngressPolicy( + npIngress []network_v1.NetworkPolicyIngressRule, + nameSpace string) ([]k8sIngress, error) { + ingressRules := []k8sIngress{} + //npLog.Infof("Recv Ingress Policy:=%+v", npIngress) if len(npIngress) <= 0 { return ingressRules, fmt.Errorf("no ingress rules") } - + //Walk in all received Ingress Policys for _, policy := range npIngress { rules := k8sNet.getPolicyPorts(policy.Ports) - if len(rules) <= 0 { - return ingressRules, fmt.Errorf("empty policy ports") - } - - fromPodSelector, err := k8sNet.getIngressPodSelectorList(policy.From) - // don't apply partial policy. + //build Ingress PodSelector obj + fromPodSelector, err := k8sNet. + getIngressPodSelectorList(policy.From, nameSpace) if err != nil { return []k8sIngress{}, err } - ingressRules = append(ingressRules, k8sIngress{IngressRules: rules, IngressPodSelector: fromPodSelector}) + //npLog.Infof("fromPodSelector:%+v", fromPodSelector) + ingressRules = append(ingressRules, + k8sIngress{IngressRules: rules, + IngressPodSelector: fromPodSelector}) } return ingressRules, nil } -func (k8sNet *k8sContext) getPodSelector(m map[string]string) (*k8sPodSelector, error) { - PodSelector := k8sPodSelector{TenantName: defaultTenantName, NetworkName: defaultNetworkName, GroupName: defaultEpgName} - - // check tenant - if _, err := k8sNet.contivClient.TenantGet(PodSelector.TenantName); err != nil { - return nil, fmt.Errorf("tenant %s doesn't exist, %v", PodSelector.TenantName, err) +func (k8sNet *k8sContext) getPodsIpsSetUsingLabel(m map[string]string, + nameSpace string) ([]string, error) { + var ipList []string + // labels.Parser + labelSectorStr := labels.SelectorFromSet(labels.Set(m)).String() + //Quary to K8s Api server for pods of given Label selector + podsList, err := k8sNet.k8sClientSet.CoreV1(). + Pods(nameSpace). + List(meta_v1.ListOptions{LabelSelector: labelSectorStr}) + if err != nil { + npLog.Fatalf("failed to get Pods from K8S Server, %v", err) + return nil, err } - npLog.Info("Got tenant from Cotiv client") - - // check network - if _, err := k8sNet.contivClient.NetworkGet(PodSelector.TenantName, PodSelector.NetworkName); err != nil { - return nil, fmt.Errorf("network: +%v doesn't exist, %v", PodSelector, err) + for _, pod := range podsList.Items { + ipList = append(ipList, pod.Status.PodIP) } - npLog.Info("Got network from Cotiv client") + return ipList, nil +} +func (k8sNet *k8sContext) initPodSelectorCacheTbl(m map[string]string, + podSelector *k8sPodSelector) error { + if podSelector == nil { + return fmt.Errorf("Passe Nil Pod Selector reference") + } + //XXX:Don't confused PodSelector with Pod: PodSelector object keeps all + //attched label Ips + if len(podSelector.podIps) <= 0 { + podSelector.podIps = make(map[string]string, 0) + npLog.Infof("Init PodSelector podIp table:%v", + podSelector.labelPodMap) + } + //PodSelector: Keep track of all its label + if len(podSelector.labelPodMap) <= 0 { + podSelector.labelPodMap = make(map[string]map[string]bool, 0) + for key, val := range m { + lkey := getLabelSelector(key, val) + podSelector.labelPodMap[lkey] = make(map[string]bool, 0) + } + npLog.Infof("Init PodSelector Map table:%v", + podSelector.labelPodMap) + } + return nil +} - podsList, err := k8sNet.k8sClientSet.CoreV1().Pods("kube-system").List(v1.ListOptions{LabelSelector: labels.SelectorFromSet(m).String()}) +//Create podSelector object and Init its attributes i.e podIps , label etc +func (k8sNet *k8sContext) parsePodSelectorInfo(m map[string]string, + nameSpace string) (*k8sPodSelector, error) { + + PodSelector := k8sPodSelector{ + TenantName: getTenantInfo(), + NetworkName: getNetworkInfo(), + GroupName: getEpgInfo()} + + npLog.Infof("Recv label:%+v", m) + + // check tenant from Contiv Api Server + // if _, err := + // k8sNet.contivClient.TenantGet(PodSelector.TenantName); err != nil { + // return nil, fmt.Errorf("tenant %s doesn't exist, %v", + // PodSelector.TenantName, err) + // } + // check network from Contiv Api Server + //if _, err := k8sNet.contivClient.NetworkGet( + // PodSelector.TenantName, PodSelector.NetworkName); err != nil { + // return nil, fmt.Errorf("network:%+v doesn't exist %v", + // PodSelector, err) + //} + // labels.Parser + labelSectorStr := labels.SelectorFromSet(labels.Set(m)).String() + //Quary to K8s Api server for pods of given Label selector + podsList, err := k8sNet.k8sClientSet.CoreV1(). + Pods(nameSpace). + List(meta_v1.ListOptions{LabelSelector: labelSectorStr}) if err != nil { npLog.Fatalf("failed to get Pods from K8S Server, %v", err) return nil, err } - npLog.Info("Got Pods Ips info From APIS server") + + if err = k8sNet.initPodSelectorCacheTbl(m, &PodSelector); err != nil { + return nil, err + } + //Build mapping for Label To PodIP for _, pod := range podsList.Items { - PodSelector.PodIps = append(PodSelector.PodIps, pod.Status.PodIP) - npLog.Info("Recv %s PodIp", pod.Status.PodIP) + for key, val := range pod.ObjectMeta.Labels { + lkey := getLabelSelector(key, val) + npLog.Infof("Update label Selector key:%v", lkey) + if ipMap, ok := PodSelector.labelPodMap[lkey]; ok { //Setup IpMap Tbl + ipMap[pod.Status.PodIP] = true + } + } } - npLog.Info("PodSelector %+v PodIp", PodSelector) - + //Recalculate Podselector Ips + k8sNet.updatePodSelectorPodIps(&PodSelector) + npLog.Info("PodSelector %+v", PodSelector) return &PodSelector, err } -func (k8sNet *k8sContext) createDefaultPolicy(tenantName string, epgName string) error { +//Update PodSelector Label IP mapping +func (k8sNet *k8sContext) updatePodSelectorLabelIPMap( + podSelector *k8sPodSelector, + labelSelector string, + ipList []string, + isAdd bool) { + //Check Nil + if podSelector == nil { + npLog.Infof("Nil Pod Selector") + return + } + if ipMap, ok := podSelector.labelPodMap[labelSelector]; ok { + for _, ip := range ipList { + if isAdd { //Add Pods + ipMap[ip] = true + } else { //remove Pods + delete(ipMap, ip) + } + } + npLog.Infof(" Pod Ips After Update Pod Selector:%+v event:%v", + podSelector, isAdd) + } + return +} + +//Create default deny rules for given policy +func (k8sNet *k8sContext) createPolicyWithDefaultRule(tenantName string, + epgName, policyName string) error { var err error + npLog.Infof("Create default policy for epg:%s policy:%s", + epgName, policyName) - if err = k8sNet.createPolicy(defaultTenantName, epgName); err != nil { + if err = k8sNet.createPolicy(defaultTenantName, + epgName, policyName); err != nil { + npLog.Infof("Failed to create Policy :%v", policyName) return err } - for _, direction := range []string{"in", "out"} { - if err = k8sNet.createRule(&client.Rule{ - TenantName: tenantName, - PolicyName: k8sutils.EpgNameToPolicy(epgName), - RuleID: k8sutils.DenyAllRuleID + direction, - Priority: k8sutils.DenyAllPriority, - Direction: direction, - Action: "allow", - }); err != nil { - return err - } + //Add default rule into policy + if err = k8sNet.createRule(&client.Rule{ + TenantName: tenantName, + PolicyName: policyName, + FromEndpointGroup: epgName, + RuleID: k8sutils.DenyAllRuleID + "in", + Priority: k8sutils.DenyAllPriority, + Direction: "in", + Action: "deny", + }); err != nil { + return err } + k8sNet.policyRules[policyName] = append(k8sNet.policyRules[policyName], + (k8sutils.AllowAllRuleID + "in")) return nil } +func (k8sNet *k8sContext) createDefaultDenyRule(tenantName, epgName, policyName string) error { + //Add default rule into policy + if err := k8sNet.createRule(&client.Rule{ + TenantName: tenantName, + PolicyName: policyName, + FromEndpointGroup: epgName, + RuleID: k8sutils.DenyAllRuleID + "in", + Priority: k8sutils.DenyAllPriority, + Direction: "in", + Action: "deny", + }); err != nil { + return err + } + k8sNet.policyRules[policyName] = append(k8sNet.policyRules[policyName], + (k8sutils.DenyAllRuleID + "in")) + return nil +} + +func (k8sNet *k8sContext) createDefaultPolicy(tenantName string, + epgName string) error { + return k8sNet.createPolicyWithDefaultRule(tenantName, + epgName, defaultPolicyName) +} diff --git a/netmaster/master/endpoint.go b/netmaster/master/endpoint.go old mode 100755 new mode 100644 index 689ab0af0..edc341b39 --- a/netmaster/master/endpoint.go +++ b/netmaster/master/endpoint.go @@ -119,10 +119,7 @@ func CreateEndpoint(stateDriver core.StateDriver, nwCfg *mastercfg.CfgNetworkSta epCfg.ServiceName = ep.ServiceName epCfg.EPCommonName = epReq.EPCommonName - // In ACI mode, if a pod does not have a group label, we will assume "default-group" - isAci, _ := IsAciConfigured() - - if isAci && (len(epCfg.ServiceName) == 0) { + if len(epCfg.ServiceName) == 0 { epCfg.ServiceName = "default-group" log.Infof("Over-riding null group with default-group for ep %s nw %s", epCfg.EndpointID, epCfg.NetID) } diff --git a/netmaster/objApi/apiController.go b/netmaster/objApi/apiController.go index 72494f5e1..384e11926 100644 --- a/netmaster/objApi/apiController.go +++ b/netmaster/objApi/apiController.go @@ -1585,6 +1585,7 @@ func (ac *APIController) RuleCreate(rule *contivModel.Rule) error { // find the policy policy := contivModel.FindPolicy(policyKey) + log.Infof("RuleCreate on Policy: %v", policy) if policy == nil { log.Errorf("Error finding policy %s", policyKey) return core.Errorf("Policy not found") diff --git a/utils/k8sutils/k8sutils.go b/utils/k8sutils/k8sutils.go index 624e3c33c..c20510a5b 100644 --- a/utils/k8sutils/k8sutils.go +++ b/utils/k8sutils/k8sutils.go @@ -31,6 +31,10 @@ const ( DenyAllRuleID = "deny-all-0-" // DenyAllPriority default deny all rule priority DenyAllPriority = 1 + // AllowAllRuleID default allow all rule id + AllowAllRuleID = "allow-all-0-" + // AllowAllPriority default deny all rule priority + AllowAllPriority = 1 // K8sTenantLabel k8s tenant label used by contiv K8sTenantLabel = "io.contiv.tenant" @@ -41,8 +45,8 @@ const ( ) // EpgNameToPolicy generate policy name from endpoint group -func EpgNameToPolicy(epgName string) string { - return epgName + "-policy" +func EpgNameToPolicy(epgName, policyName string) string { + return epgName + "-" + policyName + "-policy" } // PolicyToRuleID generate rule id from policy details @@ -50,6 +54,12 @@ func PolicyToRuleID(epgName string, protocol string, port int, direction string) return epgName + "-" + protocol + "-" + strconv.Itoa(port) + "-" + direction } +// PolicyToRuleID generate rule id from policy details +func PolicyToRuleIDUsingIps(InIps, FromIps string, port int, protocol, policyName string) string { + //return InIps + "-" + FromIps + "-" + strconv.Itoa(port) + "-" + protocol + return policyName + "-" + InIps + "-" + FromIps +} + // GetK8SConfig reads and parses the contivKubeCfgFile func GetK8SConfig(pCfg *ContivConfig) error { bytes, err := ioutil.ReadFile(contivKubeCfgFile)