diff --git a/pkg/loadbalancer/proxy.go b/pkg/loadbalancer/proxy.go index 7cd203f5..c82356df 100644 --- a/pkg/loadbalancer/proxy.go +++ b/pkg/loadbalancer/proxy.go @@ -3,6 +3,7 @@ package loadbalancer import ( "bytes" "context" + "fmt" "net" "strconv" "strings" @@ -24,16 +25,15 @@ const proxyConfigPath = "/usr/local/etc/haproxy/haproxy.cfg" // proxyConfigData is supplied to the loadbalancer config template type proxyConfigData struct { - ServicePorts []string - HealthCheckPort int - BackendServers map[string][]backendServer - IPv6 bool - IPv4 bool + HealthCheckPort int // is the same for all ServicePorts + ServicePorts map[string]data // key is the IP family and Port to support MultiPort services } -type backendServer struct { - Name string - Address string +type data struct { + // frontend + BindAddress string // *:Port for IPv4 :::Port for IPv6 + // backend + Backends map[string]string // key: node name value: IP:Port } // proxyDefaultConfigTemplate is the loadbalancer config template @@ -57,41 +57,17 @@ defaults # allow to boot despite dns don't resolve backends default-server init-addr none -{{- if .IPv4}} -frontend ipv4-frontend -{{- $bind := "*:" }} -{{- range $index, $port := .ServicePorts }} - bind {{$bind}}{{ $port }} -{{- end}} - default_backend ipv4-backend -{{end}} - -{{- if .IPv6}} -frontend ipv6-frontend -{{- $bind := ":::"}} -{{- range $index, $port := .ServicePorts }} - bind {{$bind}}{{ $port }} -{{- end}} - default_backend ipv6-backend -{{end}} - -{{- if .IPv4}} -backend ipv4-backend - option httpchk GET /healthz - {{- $hcport := .HealthCheckPort -}} - {{- range $i, $server := index .BackendServers "IPv4" }} - server {{ $server.Name }} {{ $server.Address }} check port {{ $hcport }} inter 5s fall 3 rise 1 - {{- end}} -{{end}} +{{ range $index, $data := .ServicePorts }} +frontend {{$index}}-frontend + bind {{ $data.BindAddress }} + default_backend {{$index}}-backend -{{- if .IPv6}} -backend ipv6-backend +backend {{$index}}-backend option httpchk GET /healthz - {{- $hcport := .HealthCheckPort -}} - {{- range $i, $server := index .BackendServers "IPv6" }} - server {{ $server.Name }} {{ $server.Address }} check port {{ $hcport }} inter 5s fall 3 rise 1 + {{- range $server, $address := $data.Backends }} + server {{ $server }} {{ $address }} check port {{ $.HealthCheckPort }} inter 5s fall 3 rise 1 {{- end}} -{{- end}} +{{ end }} ` // proxyConfig returns a kubeadm config generated from config data, in particular @@ -110,52 +86,64 @@ func proxyConfig(data *proxyConfigData) (config string, err error) { return buff.String(), nil } -func proxyUpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error { - name := loadBalancerName(clusterName, service) +func generateConfig(service *v1.Service, nodes []*v1.Node) *proxyConfigData { if service == nil { return nil } - config := &proxyConfigData{ - HealthCheckPort: 10256, // kube-proxy default port - BackendServers: map[string][]backendServer{}, - ServicePorts: []string{}, - } - - config.IPv6 = len(service.Spec.IPFamilies) == 2 || service.Spec.IPFamilies[0] == v1.IPv6Protocol - config.IPv4 = len(service.Spec.IPFamilies) == 2 || service.Spec.IPFamilies[0] == v1.IPv4Protocol + hcPort := 10256 // kube-proxy default port if service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal { - config.HealthCheckPort = int(service.Spec.HealthCheckNodePort) + hcPort = int(service.Spec.HealthCheckNodePort) } - backends := map[string][]backendServer{} - for _, n := range nodes { - for _, addr := range n.Status.Addresses { - if addr.Type == v1.NodeInternalIP { - if netutils.IsIPv4String(addr.Address) { - backends[string(v1.IPv4Protocol)] = append(backends[string(v1.IPv4Protocol)], backendServer{Name: n.Name, Address: addr.Address}) - } else if netutils.IsIPv6String(addr.Address) { - backends[string(v1.IPv6Protocol)] = append(backends[string(v1.IPv6Protocol)], backendServer{Name: n.Name, Address: addr.Address}) - } - } - } + lbConfig := &proxyConfigData{ + HealthCheckPort: hcPort, } - // TODO: support UDP - for _, port := range service.Spec.Ports { - if port.Protocol != v1.ProtocolTCP { - continue - } - config.ServicePorts = append(config.ServicePorts, strconv.Itoa(int(port.Port))) + servicePortConfig := map[string]data{} + for _, ipFamily := range service.Spec.IPFamilies { + // TODO: support UDP + for _, port := range service.Spec.Ports { + if port.Protocol != v1.ProtocolTCP { + klog.Infof("service port protocol %s not supported", port.Protocol) + continue + } + key := fmt.Sprintf("%s_%d", string(ipFamily), port.Port) + bind := `*` + if ipFamily == v1.IPv6Protocol { + bind = `::` + } + servicePortConfig[key] = data{ + BindAddress: fmt.Sprintf("%s:%d", bind, port.Port), + Backends: map[string]string{}, + } - for _, be := range backends { - for i := range be { - be[i].Address = net.JoinHostPort(be[i].Address, strconv.Itoa(int(port.NodePort))) + for _, n := range nodes { + for _, addr := range n.Status.Addresses { + // only internal IPs supported + if addr.Type != v1.NodeInternalIP { + klog.V(2).Infof("address type %s, only %s supported", addr.Type, v1.NodeInternalIP) + continue + } + // only addresses that match the Service IP family + if (netutils.IsIPv4String(addr.Address) && ipFamily != v1.IPv4Protocol) || + (netutils.IsIPv6String(addr.Address) && ipFamily != v1.IPv6Protocol) { + continue + } + servicePortConfig[key].Backends[n.Name] = net.JoinHostPort(addr.Address, strconv.Itoa(int(port.NodePort))) + } } } } - config.BackendServers = backends - klog.V(2).Infof("backend servers info: %v", backends) + lbConfig.ServicePorts = servicePortConfig + klog.V(2).Infof("haproxy config info: %+v", lbConfig) + return lbConfig +} +func proxyUpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error { + if service == nil { + return nil + } + config := generateConfig(service, nodes) // create loadbalancer config data loadbalancerConfig, err := proxyConfig(config) if err != nil { @@ -164,6 +152,7 @@ func proxyUpdateLoadBalancer(ctx context.Context, clusterName string, service *v klog.V(2).Infof("updating loadbalancer with config %s", loadbalancerConfig) var stdout, stderr bytes.Buffer + name := loadBalancerName(clusterName, service) err = container.Exec(name, []string{"cp", "/dev/stdin", proxyConfigPath}, strings.NewReader(loadbalancerConfig), &stdout, &stderr) if err != nil { return err diff --git a/pkg/loadbalancer/proxy_test.go b/pkg/loadbalancer/proxy_test.go new file mode 100644 index 00000000..7236392c --- /dev/null +++ b/pkg/loadbalancer/proxy_test.go @@ -0,0 +1,247 @@ +package loadbalancer + +import ( + "reflect" + "testing" + + "github.com/google/go-cmp/cmp" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" +) + +func makeNode(name string, ip string) *v1.Node { + return &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: v1.NodeSpec{}, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{ + {Type: v1.NodeInternalIP, Address: ip}, + }, + }, + } +} + +func makeService(name string) *v1.Service { + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: v1.ServiceSpec{ + Type: v1.ServiceTypeClusterIP, + Ports: []v1.ServicePort{ + {Port: 80}, + }, + }, + } +} + +func Test_generateConfig(t *testing.T) { + tests := []struct { + name string + service *v1.Service + nodes []*v1.Node + want *proxyConfigData + }{ + { + name: "empty", + }, + { + name: "simple service", + service: &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: v1.ServiceSpec{ + Type: v1.ServiceTypeLoadBalancer, + ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyLocal, + IPFamilies: []v1.IPFamily{v1.IPv4Protocol}, + Ports: []v1.ServicePort{ + { + Port: 80, + TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 8080}, + NodePort: 30000, + Protocol: v1.ProtocolTCP, + }, + }, + HealthCheckNodePort: 32000, + }, + }, + nodes: []*v1.Node{ + makeNode("a", "10.0.0.1"), + makeNode("b", "10.0.0.2"), + }, + want: &proxyConfigData{ + HealthCheckPort: 32000, + ServicePorts: map[string]data{ + "IPv4_80": data{BindAddress: "*:80", Backends: map[string]string{"a": "10.0.0.1:30000", "b": "10.0.0.2:30000"}}, + }, + }, + }, + { + name: "multiport service", + service: &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: v1.ServiceSpec{ + Type: v1.ServiceTypeLoadBalancer, + ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyLocal, + IPFamilies: []v1.IPFamily{v1.IPv4Protocol}, + Ports: []v1.ServicePort{ + { + Port: 80, + TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 8080}, + NodePort: 30000, + Protocol: v1.ProtocolTCP, + }, + { + Port: 443, + TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 8080}, + NodePort: 31000, + Protocol: v1.ProtocolTCP, + }, + }, + HealthCheckNodePort: 32000, + }, + }, + nodes: []*v1.Node{ + makeNode("a", "10.0.0.1"), + makeNode("b", "10.0.0.2"), + }, + want: &proxyConfigData{ + HealthCheckPort: 32000, + ServicePorts: map[string]data{ + "IPv4_80": data{BindAddress: "*:80", Backends: map[string]string{"a": "10.0.0.1:30000", "b": "10.0.0.2:30000"}}, + "IPv4_443": data{BindAddress: "*:443", Backends: map[string]string{"a": "10.0.0.1:31000", "b": "10.0.0.2:31000"}}, + }, + }, + }, + { + name: "multiport service ipv6", + service: &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: v1.ServiceSpec{ + Type: v1.ServiceTypeLoadBalancer, + ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyLocal, + IPFamilies: []v1.IPFamily{v1.IPv6Protocol}, + Ports: []v1.ServicePort{ + { + Port: 80, + TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 8080}, + NodePort: 30000, + Protocol: v1.ProtocolTCP, + }, + { + Port: 443, + TargetPort: intstr.IntOrString{Type: intstr.Int, IntVal: 8080}, + NodePort: 31000, + Protocol: v1.ProtocolTCP, + }, + }, + HealthCheckNodePort: 32000, + }, + }, + nodes: []*v1.Node{ + makeNode("a", "2001:db2::3"), + makeNode("b", "2001:db2::4"), + }, + want: &proxyConfigData{ + HealthCheckPort: 32000, + ServicePorts: map[string]data{ + "IPv6_80": data{BindAddress: `:::80`, Backends: map[string]string{"a": "[2001:db2::3]:30000", "b": "[2001:db2::4]:30000"}}, + "IPv6_443": data{BindAddress: `:::443`, Backends: map[string]string{"a": "[2001:db2::3]:31000", "b": "[2001:db2::4]:31000"}}, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := generateConfig(tt.service, tt.nodes); !reflect.DeepEqual(got, tt.want) { + t.Errorf("generateConfig() = %+v,\n want %+v", got, tt.want) + } + }) + } +} + +func Test_proxyConfig(t *testing.T) { + // Only to check the templating figure out how to assert better on the output + t.Skip() + tests := []struct { + name string + data *proxyConfigData + wantConfig string + }{ + { + name: "ipv4", + data: &proxyConfigData{ + HealthCheckPort: 32764, + ServicePorts: map[string]data{ + "IPv4_80": data{BindAddress: "*:80", Backends: map[string]string{ + "kind-worker": "192.168.8.2:30497", + "kind-worker2": "192.168.8.3:30497", + }}, + "IPv4_443": data{BindAddress: "*:443", Backends: map[string]string{ + "kind-worker": "192.168.8.2:31497", + "kind-worker2": "192.168.8.3:31497", + }}, + }, + }, + wantConfig: ` +global +log /dev/log local0 +log /dev/log local1 notice +daemon + +resolvers docker +nameserver dns 127.0.0.11:53 + +defaults +log global +mode tcp +option dontlognull +# TODO: tune these +timeout connect 5000 +timeout client 50000 +timeout server 50000 +# allow to boot despite dns don't resolve backends +default-server init-addr none + +frontend IPv4_443-frontend + bind *:443 + default_backend IPv4_443-backend + +backend IPv4_443-backend + option httpchk GET /healthz + server kind-worker 192.168.8.2:31497 check port 32764 inter 5s fall 3 rise 1 + server kind-worker2 192.168.8.3:31497 check port 32764 inter 5s fall 3 rise 1 + +frontend IPv4_80-frontend + bind *:80 + default_backend IPv4_80-backend + +backend IPv4_80-backend + option httpchk GET /healthz + server kind-worker 192.168.8.2:30497 check port 32764 inter 5s fall 3 rise 1 + server kind-worker2 192.168.8.3:30497 check port 32764 inter 5s fall 3 rise 1 +`, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotConfig, err := proxyConfig(tt.data) + if err != nil { + t.Errorf("proxyConfig() error = %v", err) + return + } + if gotConfig != tt.wantConfig { + t.Errorf("proxyConfig() = %v , want %v", gotConfig, tt.wantConfig) + t.Errorf("proxyConfig() = %v", cmp.Diff(gotConfig, tt.wantConfig)) + } + }) + } +}