Skip to content

Commit

Permalink
Backport of Add support for Nomad transparent proxy into release/1.4.x (
Browse files Browse the repository at this point in the history
#3830)

* backport of commit f3810b1

* backport of commit 0a2bc2c

* backport of commit 69687c3

---------

Co-authored-by: Tim Gross <[email protected]>
  • Loading branch information
hc-github-team-consul-core and tgross authored Mar 28, 2024
1 parent 486ea75 commit 4883169
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 43 deletions.
3 changes: 3 additions & 0 deletions .changelog/3795.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
control-plane: Add support for receiving iptables configuration via CNI arguments, to support Nomad transparent proxy
```
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ pkg/
.idea/
.vscode
.bob/
control-plane/cni/cni
121 changes: 81 additions & 40 deletions control-plane/cni/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ type CNIArgs struct {
K8S_POD_NAMESPACE types.UnmarshallableString
// K8S_POD_INFRA_CONTAINER_ID is the runtime container ID that the pod runs under.
K8S_POD_INFRA_CONTAINER_ID types.UnmarshallableString

// CONSUL_IPTABLES_CONFIG is the runtime iptables configuration passed by
// orchestrator (ex. the Nomad client agent)
CONSUL_IPTABLES_CONFIG types.UnmarshallableString
}

// PluginConf is is the configuration used by the plugin.
Expand All @@ -95,9 +99,8 @@ type PluginConf struct {
Multus bool `json:"multus"`
// Kubeconfig file name. Can be set as a cli flag.
Kubeconfig string `json:"kubeconfig"`
// LogLevl is the logging level. Can be set as a cli flag.
// LogLevel is the logging level. Can be set as a cli flag.
LogLevel string `json:"log_level"`
//
}

// parseConfig parses the supplied CNI configuration (and prevResult) from stdin.
Expand Down Expand Up @@ -132,9 +135,11 @@ func (c *Command) cmdAdd(args *skel.CmdArgs) error {

podNamespace := string(cniArgs.K8S_POD_NAMESPACE)
podName := string(cniArgs.K8S_POD_NAME)
cniArgsIPTablesCfg := string(cniArgs.CONSUL_IPTABLES_CONFIG)

// We should never encounter this unless there has been an error in the kubelet. A good safeguard.
if podNamespace == "" || podName == "" {
// We should never encounter this unless there has been an error in the
// kubelet. A good safeguard.
if (podNamespace == "" || podName == "") && cniArgsIPTablesCfg == "" {
return fmt.Errorf("not running in a pod, namespace and pod should have values")
}

Expand Down Expand Up @@ -167,49 +172,55 @@ func (c *Command) cmdAdd(args *skel.CmdArgs) error {
result = prevResult
}

ctx := context.Background()
if c.client == nil {
var iptablesCfg iptables.Config

// Connect to kubernetes.
restConfig, err := clientcmd.BuildConfigFromFlags("", filepath.Join(cfg.CNINetDir, cfg.Kubeconfig))
// If cniArgsIPTablesCfg is populated we're on Nomad, otherwise we're on K8s
if cniArgsIPTablesCfg != "" {
var err error
iptablesCfg, err = parseIPTablesFromCNIArgs(cniArgsIPTablesCfg)
if err != nil {
return fmt.Errorf("could not get rest config from kubernetes api: %s", err)
return err
}
} else {
if c.client == nil {
if err := c.createK8sClient(cfg); err != nil {
return err
}
}

c.client, err = kubernetes.NewForConfig(restConfig)
ctx := context.Background()
pod, err := c.client.CoreV1().Pods(podNamespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("error initializing Kubernetes client: %s", err)
return fmt.Errorf("error retrieving pod: %s", err)
}
}

pod, err := c.client.CoreV1().Pods(podNamespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("error retrieving pod: %s", err)
}

// Skip traffic redirection if the correct annotations are not on the pod.
if skipTrafficRedirection(*pod) {
logger.Debug("skipping traffic redirection because the pod is either not injected or transparent proxy is disabled: %s", pod.Name)
return types.PrintResult(result, cfg.CNIVersion)
}
// Skip traffic redirection if the correct annotations are not on the pod.
if skipTrafficRedirection(*pod) {
logger.Debug("skipping traffic redirection because the pod is either not injected or transparent proxy is disabled: %s", pod.Name)
return types.PrintResult(result, cfg.CNIVersion)
}

// We do not throw an error here because kubernetes will often throw a benign error where the pod has been
// updated in between the get and update of the annotation. Eventually kubernetes will update the annotation
ok := c.updateTransparentProxyStatusAnnotation(podName, podNamespace, waiting)
if !ok {
logger.Info("unable to update %s pod annotation to waiting", keyTransparentProxyStatus)
}
// We do not throw an error here because kubernetes will often throw a
// benign error where the pod has been updated in between the get and
// update of the annotation. Eventually kubernetes will update the
// annotation
ok := c.updateTransparentProxyStatusAnnotation(podName, podNamespace, waiting)
if !ok {
logger.Info("unable to update %s pod annotation to waiting", keyTransparentProxyStatus)
}

// Parse the cni-proxy-config annotation into an iptables.Config object.
iptablesCfg, err := parseAnnotation(*pod, annotationRedirectTraffic)
if err != nil {
return err
// Parse the cni-proxy-config annotation into an iptables.Config object.
iptablesCfg, err = parseAnnotation(*pod, annotationRedirectTraffic)
if err != nil {
return err
}
}

// Set NetNS passed through the CNI.
iptablesCfg.NetNS = args.Netns

// Set the provider to a fake provider in testing, otherwise use the default iptables.Provider
// Set the provider to a fake provider in testing, otherwise use the default
// iptables.Provider
if c.iptablesProvider != nil {
iptablesCfg.IptablesProvider = c.iptablesProvider
}
Expand All @@ -220,15 +231,21 @@ func (c *Command) cmdAdd(args *skel.CmdArgs) error {
return fmt.Errorf("could not apply iptables setup: %v", err)
}

// We do not throw an error here because kubernetes will often throw a benign error where the pod has been
// updated in between the get and update of the annotation. Eventually kubernetes will update the annotation
ok = c.updateTransparentProxyStatusAnnotation(podName, podNamespace, complete)
if !ok {
logger.Info("unable to update %s pod annotation to complete", keyTransparentProxyStatus)
if cniArgsIPTablesCfg == "" {

// We do not throw an error here because kubernetes will often throw a
// benign error where the pod has been updated in between the get and update
// of the annotation. Eventually kubernetes will update the annotation
ok := c.updateTransparentProxyStatusAnnotation(podName, podNamespace, complete)
if !ok {
logger.Info("unable to update %s pod annotation to complete", keyTransparentProxyStatus)
}
}

logger.Debug("traffic redirect rules applied to pod: %s", pod.Name)
// Pass through the result for the next plugin even though we are the final plugin in the chain.
logger.Debug("traffic redirect rules applied to pod: %s", podName)

// Pass through the result for the next plugin even if we are the final
// plugin in the chain.
return types.PrintResult(result, cfg.CNIVersion)
}

Expand All @@ -249,6 +266,21 @@ func main() {
skel.PluginMain(c.cmdAdd, cmdCheck, cmdDel, version.All, bv.BuildString("consul-cni"))
}

// createK8sClient configures the command's Kubernetes API client if it doesn't
// already exist
func (c *Command) createK8sClient(cfg *PluginConf) error {
restConfig, err := clientcmd.BuildConfigFromFlags("", filepath.Join(cfg.CNINetDir, cfg.Kubeconfig))
if err != nil {
return fmt.Errorf("could not get rest config from kubernetes api: %s", err)
}

c.client, err = kubernetes.NewForConfig(restConfig)
if err != nil {
return fmt.Errorf("error initializing Kubernetes client: %s", err)
}
return nil
}

// skipTrafficRedirection looks for annotations on the pod and determines if it should skip traffic redirection.
// The absence of the annotations is the equivalent of "disabled" because it means that the connect-inject mutating
// webhook did not run against the pod.
Expand All @@ -267,6 +299,15 @@ func skipTrafficRedirection(pod corev1.Pod) bool {
return false
}

func parseIPTablesFromCNIArgs(args string) (iptables.Config, error) {
cfg := iptables.Config{}
err := json.Unmarshal([]byte(args), &cfg)
if err != nil {
return cfg, fmt.Errorf("could not unmarshal CNI args: %w", err)
}
return cfg, nil
}

// parseAnnotation parses the cni-proxy-config annotation into an iptables.Config object.
func parseAnnotation(pod corev1.Pod, annotation string) (iptables.Config, error) {
anno, ok := pod.Annotations[annotation]
Expand Down
77 changes: 74 additions & 3 deletions control-plane/cni/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func Test_cmdAdd(t *testing.T) {
cmd *Command
podName string
stdInData string
cmdArgs *skel.CmdArgs
configuredPod func(*corev1.Pod, *Command) *corev1.Pod
expectedRules bool
expectedErr error
Expand Down Expand Up @@ -127,12 +128,33 @@ func Test_cmdAdd(t *testing.T) {
expectedErr: nil,
expectedRules: true, // Rules will be applied
},
{
name: "Parsing iptables from CNI_ARGs as in Nomad",
cmd: &Command{
client: fake.NewSimpleClientset(),
iptablesProvider: &fakeIptablesProvider{},
},
cmdArgs: &skel.CmdArgs{ContainerID: "some-container-id",
IfName: "eth0",
Args: fmt.Sprintf("CONSUL_IPTABLES_CONFIG=%s", minimalIPTablesJSON(t)),
Path: "/some/bin/path",
},
stdInData: nomadStdinData,
expectedErr: nil,
expectedRules: true,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
_ = c.configuredPod(minimalPod(c.podName), c.cmd)
err := c.cmd.cmdAdd(minimalSkelArgs(c.podName, defaultNamespace, c.stdInData))
require.Equal(t, c.expectedErr, err)
if c.cmdArgs != nil {
c.cmdArgs.StdinData = []byte(c.stdInData)
err := c.cmd.cmdAdd(c.cmdArgs)
require.Equal(t, c.expectedErr, err)
} else {
_ = c.configuredPod(minimalPod(c.podName), c.cmd)
err := c.cmd.cmdAdd(minimalSkelArgs(c.podName, defaultNamespace, c.stdInData))
require.Equal(t, c.expectedErr, err)
}

// Check to see that rules have been generated
if c.expectedErr == nil && c.expectedRules {
Expand Down Expand Up @@ -355,3 +377,52 @@ const missingIPsStdinData = `{
"name": "consul-cni",
"type": "consul-cni"
}`

const nomadStdinData = `{
"cniVersion": "0.4.0",
"dns": {},
"prevResult": {
"cniversion": "0.4.0",
"interfaces": [
{
"name": "eth0",
"mac": "aa:bb:cc:dd:ee:ff",
"sandbox": "/var/rum/netns/16c"
}
],
"ips": [
{
"version": "4",
"address": "10.0.0.2/24",
"gateway": "10.0.0.1",
"interface": 0
}
],
"routes": []
},
"cni_bin_dir": "/opt/cni/bin",
"cni_net_dir": "/etc/cni/net.d",
"log_level": "info",
"name": "nomad",
"type": "consul-cni"
}
`

func minimalIPTablesJSON(t *testing.T) string {
cfg := iptables.Config{
ConsulDNSIP: "127.0.0.1",
ConsulDNSPort: 8600,
ProxyUserID: "101",
ProxyInboundPort: 20000,
ProxyOutboundPort: 15001,
ExcludeInboundPorts: []string{"9000"},
ExcludeOutboundPorts: []string{"15002"},
ExcludeOutboundCIDRs: []string{"10.0.0.0/24"},
ExcludeUIDs: []string{"1", "42"},
NetNS: "/some/netns/path",
}
buf, err := json.Marshal(cfg)
require.NoError(t, err)
return string(buf)
}

0 comments on commit 4883169

Please sign in to comment.