From 4d64ba132637fec72c925d66a354fa79f8261c71 Mon Sep 17 00:00:00 2001 From: Jarno Rajahalme Date: Sun, 25 Oct 2020 14:10:57 -0700 Subject: [PATCH] clustermesh-apiserver: Add to cilium repo Move clustermesh-apiserver to Cilium repo to ease development. Add support scripts for mTLS config management to contrib/k8s. Signed-off-by: Jarno Rajahalme --- Makefile.docker | 22 +- clustermesh-apiserver.Dockerfile | 41 ++ clustermesh-apiserver/.gitignore | 1 + clustermesh-apiserver/Makefile | 26 + clustermesh-apiserver/README.md | 29 + clustermesh-apiserver/etcd-config.yaml | 6 + clustermesh-apiserver/main.go | 626 ++++++++++++++++++ clustermesh-apiserver/test/mock.json | 15 + clustermesh-apiserver/test/runtime.yaml | 8 + clustermesh-apiserver/test/testccnp.yaml | 17 + clustermesh-apiserver/tls.rst | 240 +++++++ clustermesh-apiserver/vmmanager.go | 454 +++++++++++++ ...k8s-extract-clustermesh-nodeport-secret.sh | 55 ++ contrib/k8s/k8s-import-clustermesh-secrets.sh | 50 ++ 14 files changed, 1588 insertions(+), 2 deletions(-) create mode 100644 clustermesh-apiserver.Dockerfile create mode 100644 clustermesh-apiserver/.gitignore create mode 100644 clustermesh-apiserver/Makefile create mode 100644 clustermesh-apiserver/README.md create mode 100644 clustermesh-apiserver/etcd-config.yaml create mode 100644 clustermesh-apiserver/main.go create mode 100644 clustermesh-apiserver/test/mock.json create mode 100644 clustermesh-apiserver/test/runtime.yaml create mode 100644 clustermesh-apiserver/test/testccnp.yaml create mode 100644 clustermesh-apiserver/tls.rst create mode 100644 clustermesh-apiserver/vmmanager.go create mode 100755 contrib/k8s/k8s-extract-clustermesh-nodeport-secret.sh create mode 100755 contrib/k8s/k8s-import-clustermesh-secrets.sh diff --git a/Makefile.docker b/Makefile.docker index 095aa70be57ed..f2ee26ca9c075 100644 --- a/Makefile.docker +++ b/Makefile.docker @@ -14,9 +14,9 @@ docker-cilium-image-for-developers: --build-arg LIBNETWORK_PLUGIN=\ -t $(DOCKER_DEV_ACCOUNT)/cilium-dev:latest . -f ./cilium-dev.Dockerfile -docker-images-all: docker-cilium-image docker-plugin-image docker-hubble-relay-image docker-operator-images-all +docker-images-all: docker-cilium-image docker-plugin-image docker-hubble-relay-image docker-clustermesh-apiserver-image docker-operator-images-all -docker-images-all-unstripped: docker-cilium-image-unstripped docker-plugin-image-unstripped docker-hubble-relay-image-unstripped docker-operator-images-all-unstripped +docker-images-all-unstripped: docker-cilium-image-unstripped docker-plugin-image-unstripped docker-hubble-relay-image-unstripped docker-clustermesh-apiserver-image-unstripped docker-operator-images-all-unstripped docker-operator-images-all: docker-operator-image docker-operator-aws-image docker-operator-azure-image docker-operator-generic-image @@ -145,4 +145,22 @@ docker-hubble-relay-image-unstripped: NOSTRIP=1 docker-hubble-relay-image-unstripped: UNSTRIPPED=-unstripped docker-hubble-relay-image-unstripped: docker-hubble-relay-image +docker-clustermesh-apiserver-image: $(BUILD_DIR)/clustermesh-apiserver.Dockerfile build-context-update + $(QUIET)$(CONTAINER_ENGINE) build \ + $(DOCKER_FLAGS) \ + --build-arg BASE_IMAGE=${BASE_IMAGE} \ + --build-arg NOSTRIP=${NOSTRIP} \ + --build-arg LOCKDEBUG=${LOCKDEBUG} \ + --build-arg RACE=${RACE}\ + --build-arg CILIUM_SHA=$(firstword $(GIT_VERSION)) \ + -f $(BUILD_DIR)/clustermesh-apiserver.Dockerfile \ + -t cilium/clustermesh-apiserver$(UNSTRIPPED):$(DOCKER_IMAGE_TAG) $(DOCKER_BUILD_DIR) + $(QUIET)$(CONTAINER_ENGINE) tag cilium/clustermesh-apiserver$(UNSTRIPPED):$(DOCKER_IMAGE_TAG) cilium/clustermesh-apiserver$(UNSTRIPPED):$(DOCKER_IMAGE_TAG)-${GOARCH} + @echo "Push like this when ready:" + @echo "${CONTAINER_ENGINE} push cilium/clustermesh-apiserver$(UNSTRIPPED):$(DOCKER_IMAGE_TAG)-${GOARCH}" + +docker-clustermesh-apiserver-image-unstripped: NOSTRIP=1 +docker-clustermesh-apiserver-image-unstripped: UNSTRIPPED=-unstripped +docker-clustermesh-apiserver-image-unstripped: docker-clustermesh-apiserver-image + .PHONY: docker-image-runtime docker-image-builder docker-cilium-manifest docker-cilium-dev-manifest docker-operator-manifest docker-plugin-manifest docker-cilium-runtime-manifest docker-cilium-builder-manifest diff --git a/clustermesh-apiserver.Dockerfile b/clustermesh-apiserver.Dockerfile new file mode 100644 index 0000000000000..53b3d16b25987 --- /dev/null +++ b/clustermesh-apiserver.Dockerfile @@ -0,0 +1,41 @@ +# (first line comment needed for DOCKER_BUILDKIT use) +# +ARG BASE_IMAGE=scratch + +FROM docker.io/library/golang:1.15.3 as builder +ARG CILIUM_SHA="" +LABEL cilium-sha=${CILIUM_SHA} +ADD . /go/src/github.com/cilium/cilium +WORKDIR /go/src/github.com/cilium/cilium/clustermesh-apiserver +ARG NOSTRIP +ARG LOCKDEBUG +ARG RACE +RUN make RACE=${RACE} NOSTRIP=${NOSTRIP} LOCKDEBUG=${LOCKDEBUG} + +# CGO_ENABLED=0 GOOS=linux go build + +FROM docker.io/library/alpine:3.12.0 as certs +ARG CILIUM_SHA="" +LABEL cilium-sha=${CILIUM_SHA} +RUN apk --update add ca-certificates + +FROM docker.io/library/golang:1.15.3 as gops +ARG CILIUM_SHA="" +LABEL cilium-sha=${CILIUM_SHA} +RUN go get -d github.com/google/gops && \ + cd /go/src/github.com/google/gops && \ + git checkout -b v0.3.10 v0.3.10 && \ + git --no-pager remote -v && \ + git --no-pager log -1 && \ + CGO_ENABLED=0 go install && \ + strip /go/bin/gops + +FROM ${BASE_IMAGE} +ARG CILIUM_SHA="" +LABEL cilium-sha=${CILIUM_SHA} +LABEL maintainer="maintainer@cilium.io" +COPY --from=builder /go/src/github.com/cilium/cilium/clustermesh-apiserver/etcd-config.yaml /var/lib/cilium/etcd-config.yaml +COPY --from=builder /go/src/github.com/cilium/cilium/clustermesh-apiserver/clustermesh-apiserver /usr/bin/clustermesh-apiserver +COPY --from=certs /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt +COPY --from=gops /go/bin/gops /bin/gops +ENTRYPOINT ["/usr/bin/clustermesh-apiserver"] diff --git a/clustermesh-apiserver/.gitignore b/clustermesh-apiserver/.gitignore new file mode 100644 index 0000000000000..d2021fc9b53e0 --- /dev/null +++ b/clustermesh-apiserver/.gitignore @@ -0,0 +1 @@ +clustermesh-apiserver diff --git a/clustermesh-apiserver/Makefile b/clustermesh-apiserver/Makefile new file mode 100644 index 0000000000000..77957caf7cd70 --- /dev/null +++ b/clustermesh-apiserver/Makefile @@ -0,0 +1,26 @@ +# Copyright 2017-2020 Authors of Cilium +# SPDX-License-Identifier: Apache-2.0 + +include ../Makefile.defs + +TARGET := clustermesh-apiserver + +.PHONY: all $(TARGET) clean install + +all: $(TARGET) + +$(TARGET): + @$(ECHO_GO) + $(QUIET)$(GO_BUILD) -o $@ + +clean: + @$(ECHO_CLEAN) + -$(QUIET)rm -f $(TARGET) + $(QUIET)$(GO_CLEAN) + +install: + $(QUIET)$(INSTALL) -m 0755 -d $(DESTDIR)$(BINDIR) + $(QUIET)$(INSTALL) -m 0755 $(TARGET) $(DESTDIR)$(BINDIR) + $(QUIET)$(INSTALL) -m 0755 -d $(DESTDIR)$(CONFDIR)/bash_completion.d + ./$(TARGET) completion bash > $(TARGET)_bash_completion + $(QUIET)$(INSTALL) -m 0644 -T $(TARGET)_bash_completion $(DESTDIR)$(CONFDIR)/bash_completion.d/$(TARGET) diff --git a/clustermesh-apiserver/README.md b/clustermesh-apiserver/README.md new file mode 100644 index 0000000000000..e2ac8faa33387 --- /dev/null +++ b/clustermesh-apiserver/README.md @@ -0,0 +1,29 @@ +# API server for Cilium ClusterMesh + +## Deploy the clustermesh-apiserver + +Cilium Helm charts automatically deploy clustermesh-apiserver when Cilium +cluster.name is not "default". Remember to set a non-zero cluster.id in Helm as +well. `clustermesh-apiserver` service type defaults to `NodePort`. Depending on +your k8s provider it may be beneficial to change this to `LoadBalancer`: + + $ helm install cilium ... \ + --set clustermesh.apiserver.service.type=LoadBalancer \ + +Additionally, if your load balancer can give you a static IP address, it may be +specified like so: + + $ helm install cilium ... \ + --set clustermesh.apiserver.service.loadBalancerIP=xxx.xxx.xxx.xxx \ + +## Connect Cilium clusters in to a clustermesh + +1. Extract a `cilium-clustermesh` secret from each cluster to be applied in another cluster: + + $ contrib/k8s/k8s-extract-clustermesh-nodeport-secret.sh > cluster1-secret.json + + Repeat this step in all your clusters, storing the outputs into different files. + +3. Apply secrets from all other clusters in each of your clusters, e.g., on cluster1: + + $ contrib/k8s/k8s-import-clustermesh-secrets.sh cluster2-secret.json cluster3-secret.json ... diff --git a/clustermesh-apiserver/etcd-config.yaml b/clustermesh-apiserver/etcd-config.yaml new file mode 100644 index 0000000000000..0596006a80013 --- /dev/null +++ b/clustermesh-apiserver/etcd-config.yaml @@ -0,0 +1,6 @@ +--- +trusted-ca-file: /var/lib/cilium/etcd-secrets/ca.crt +key-file: /var/lib/cilium/etcd-secrets/tls.key +cert-file: /var/lib/cilium/etcd-secrets/tls.crt +endpoints: +- https://127.0.0.1:2379 diff --git a/clustermesh-apiserver/main.go b/clustermesh-apiserver/main.go new file mode 100644 index 0000000000000..95b4672bf5c99 --- /dev/null +++ b/clustermesh-apiserver/main.go @@ -0,0 +1,626 @@ +// Copyright 2018-2020 Authors of Cilium +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "net" + "net/http" + "os" + "os/signal" + "path" + "reflect" + "strings" + "time" + + "github.com/cilium/cilium/pkg/defaults" + "github.com/cilium/cilium/pkg/identity" + identityCache "github.com/cilium/cilium/pkg/identity/cache" + "github.com/cilium/cilium/pkg/ipcache" + "github.com/cilium/cilium/pkg/k8s" + ciliumv2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2" + clientset "github.com/cilium/cilium/pkg/k8s/client/clientset/versioned" + k8sconfig "github.com/cilium/cilium/pkg/k8s/config" + "github.com/cilium/cilium/pkg/k8s/informer" + slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1" + "github.com/cilium/cilium/pkg/k8s/types" + "github.com/cilium/cilium/pkg/kvstore" + "github.com/cilium/cilium/pkg/kvstore/store" + "github.com/cilium/cilium/pkg/labels" + "github.com/cilium/cilium/pkg/logging" + "github.com/cilium/cilium/pkg/logging/logfields" + nodeStore "github.com/cilium/cilium/pkg/node/store" + nodeTypes "github.com/cilium/cilium/pkg/node/types" + "github.com/cilium/cilium/pkg/option" + serviceStore "github.com/cilium/cilium/pkg/service/store" + + gops "github.com/google/gops/agent" + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/spf13/viper" + "golang.org/x/sys/unix" + k8sv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" +) + +var ( + log = logging.DefaultLogger.WithField(logfields.LogSubsys, "clustermesh-apiserver") + + rootCmd = &cobra.Command{ + Use: "clustermesh-apiserver", + Short: "Run the ClusterMesh apiserver", + Run: func(cmd *cobra.Command, args []string) { + runServer(cmd) + }, + } + + mockFile string + clusterID int + clusterName string + ciliumK8sClient clientset.Interface + + shutdownSignal = make(chan struct{}) + + ciliumNodeRegisterStore *store.SharedStore + ciliumNodeStore *store.SharedStore + ciliumServiceStore *store.SharedStore + serviceCache k8s.ServiceCache + + identityStore = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc) +) + +func installSigHandler() { + signals := make(chan os.Signal, 1) + signal.Notify(signals, unix.SIGINT, unix.SIGTERM) + + go func() { + <-signals + close(shutdownSignal) + }() +} + +func readMockFile(path string) error { + f, err := os.Open(path) + if err != nil { + return fmt.Errorf("unable to open file %s: %s", path, err) + } + defer f.Close() + + scanner := bufio.NewScanner(f) + for scanner.Scan() { + line := scanner.Text() + + switch { + case strings.Contains(line, "\"CiliumIdentity\""): + var identity ciliumv2.CiliumIdentity + err := json.Unmarshal([]byte(line), &identity) + if err != nil { + log.WithError(err).WithField("line", line).Warning("Unable to unmarshal CiliumIdentity") + } else { + updateIdentity(&identity) + } + case strings.Contains(line, "\"CiliumNode\""): + var node ciliumv2.CiliumNode + err = json.Unmarshal([]byte(line), &node) + if err != nil { + log.WithError(err).WithField("line", line).Warning("Unable to unmarshal CiliumNode") + } else { + updateNode(&node) + } + case strings.Contains(line, "\"CiliumEndpoint\""): + var endpoint types.CiliumEndpoint + err = json.Unmarshal([]byte(line), &endpoint) + if err != nil { + log.WithError(err).WithField("line", line).Warning("Unable to unmarshal CiliumEndpoint") + } else { + updateEndpoint(&endpoint) + } + case strings.Contains(line, "\"Service\""): + var service slim_corev1.Service + err = json.Unmarshal([]byte(line), &service) + if err != nil { + log.WithError(err).WithField("line", line).Warning("Unable to unmarshal Service") + } else { + serviceCache.UpdateService(&service, nil) + } + case strings.Contains(line, "\"Endpoints\""): + var endpoints slim_corev1.Endpoints + err = json.Unmarshal([]byte(line), &endpoints) + if err != nil { + log.WithError(err).WithField("line", line).Warning("Unable to unmarshal Endpoints") + } else { + serviceCache.UpdateEndpoints(&endpoints, nil) + } + default: + log.Warningf("Unknown line in mockfile %s: %s", path, line) + } + } + + if err := scanner.Err(); err != nil { + return err + } + + return nil +} + +func runApiserver() error { + if err := gops.Listen(gops.Options{}); err != nil { + return fmt.Errorf("unable to start gops: %s", err) + } + + flags := rootCmd.Flags() + flags.BoolP(option.DebugArg, "D", false, "Enable debugging mode") + option.BindEnv(option.DebugArg) + + flags.String(option.IdentityAllocationMode, option.IdentityAllocationModeCRD, "Method to use for identity allocation") + option.BindEnv(option.IdentityAllocationMode) + + flags.IntVar(&clusterID, option.ClusterIDName, 0, "Cluster ID") + option.BindEnv(option.ClusterIDName) + + flags.StringVar(&clusterName, option.ClusterName, "default", "Cluster name") + option.BindEnv(option.ClusterName) + + flags.StringVar(&mockFile, "mock-file", "", "Read from mock file") + + flags.Duration(option.KVstoreConnectivityTimeout, defaults.KVstoreConnectivityTimeout, "Time after which an incomplete kvstore operation is considered failed") + option.BindEnv(option.KVstoreConnectivityTimeout) + + flags.Duration(option.KVstoreLeaseTTL, defaults.KVstoreLeaseTTL, "Time-to-live for the KVstore lease.") + flags.MarkHidden(option.KVstoreLeaseTTL) + option.BindEnv(option.KVstoreLeaseTTL) + + flags.Duration(option.KVstorePeriodicSync, defaults.KVstorePeriodicSync, "Periodic KVstore synchronization interval") + option.BindEnv(option.KVstorePeriodicSync) + + flags.Var(option.NewNamedMapOptions(option.KVStoreOpt, &option.Config.KVStoreOpt, nil), + option.KVStoreOpt, "Key-value store options") + option.BindEnv(option.KVStoreOpt) + + viper.BindPFlags(flags) + option.Config.Populate() + + if err := rootCmd.Execute(); err != nil { + return err + } + + return nil +} + +func main() { + log.Infof("Starting Cilium ClusterMesh apiserver...") + + installSigHandler() + + if err := runApiserver(); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(-1) + } +} + +func startApi() { + http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { + statusCode := http.StatusOK + reply := "ok" + + if _, err := k8s.Client().Discovery().ServerVersion(); err != nil { + statusCode = http.StatusInternalServerError + reply = err.Error() + } + w.WriteHeader(statusCode) + if _, err := w.Write([]byte(reply)); err != nil { + log.WithError(err).Error("Failed to respond to /healthz request") + } + }) + + srv := &http.Server{} + + go func() { + if err := srv.ListenAndServe(); err != nil { + log.WithError(err).Fatalf("Unable to start health API") + } + + <-shutdownSignal + if err := srv.Shutdown(context.Background()); err != nil { + log.WithError(err).Error("Unable to shutdown health API") + } + }() + log.Info("Started health API") +} + +func parseLabelArrayFromMap(base map[string]string) labels.LabelArray { + array := make(labels.LabelArray, 0, len(base)) + for sourceAndKey, value := range base { + array = append(array, labels.NewLabel(sourceAndKey, value, "")) + } + return array.Sort() +} + +func updateIdentity(obj interface{}) { + identity, ok := obj.(*ciliumv2.CiliumIdentity) + if !ok { + log.Warningf("Unknown CiliumIdentity object type %s received: %+v", reflect.TypeOf(obj), obj) + return + } + + if identity == nil || identity.SecurityLabels == nil { + log.Warningf("Ignoring invalid identity %+v", identity) + return + } + + keyPath := path.Join(identityCache.IdentitiesPath, "id", identity.Name) + labelArray := parseLabelArrayFromMap(identity.SecurityLabels) + + var key []byte + for _, l := range labelArray { + key = append(key, []byte(l.FormatForKVStore())...) + } + + if len(key) == 0 { + return + } + + keyEncoded := []byte(kvstore.Client().Encode(key)) + log.WithFields(logrus.Fields{"key": keyPath, "value": string(keyEncoded)}).Info("Updating identity in etcd") + + _, err := kvstore.Client().UpdateIfDifferent(context.Background(), keyPath, keyEncoded, true) + if err != nil { + log.WithError(err).Warningf("Unable to update identity %s in etcd", keyPath) + } +} + +func deleteIdentity(obj interface{}) { + identity, ok := obj.(*ciliumv2.CiliumIdentity) + if !ok { + log.Warningf("Unknown CiliumIdentity object type %s received: %+v", reflect.TypeOf(obj), obj) + return + } + + if identity == nil { + log.Warningf("Igoring invalid identity %+v", identity) + return + } + + keyPath := path.Join(identityCache.IdentitiesPath, "id", identity.Name) + err := kvstore.Client().Delete(context.Background(), keyPath) + if err != nil { + log.WithError(err).Warningf("Unable to delete identity %s in etcd", keyPath) + } +} + +func synchronizeIdentities() { + identityInformer := informer.NewInformerWithStore( + cache.NewListWatchFromClient(ciliumK8sClient.CiliumV2().RESTClient(), + "ciliumidentities", k8sv1.NamespaceAll, fields.Everything()), + &ciliumv2.CiliumIdentity{}, + 0, + cache.ResourceEventHandlerFuncs{ + AddFunc: updateIdentity, + UpdateFunc: func(oldObj, newObj interface{}) { + updateIdentity(newObj) + }, + DeleteFunc: func(obj interface{}) { + deletedObj, ok := obj.(cache.DeletedFinalStateUnknown) + if ok { + deleteIdentity(deletedObj.Obj) + } else { + deleteIdentity(obj) + } + }, + }, + nil, + identityStore, + ) + + go identityInformer.Run(wait.NeverStop) +} + +type nodeStub string + +func (n nodeStub) GetKeyName() string { return string(n) } + +func updateNode(obj interface{}) { + if ciliumNode, ok := obj.(*ciliumv2.CiliumNode); ok { + n := nodeTypes.ParseCiliumNode(ciliumNode) + n.Cluster = clusterName + n.ClusterID = clusterID + if err := ciliumNodeStore.UpdateLocalKeySync(context.Background(), &n); err != nil { + log.WithError(err).Warning("Unable to insert node into etcd") + } else { + log.Infof("Inserted node into etcd: %v", n) + } + } else { + log.Warningf("Unknown CiliumNode object type %s received: %+v", reflect.TypeOf(obj), obj) + } +} + +func deleteNode(obj interface{}) { + n, ok := obj.(*ciliumv2.CiliumNode) + if ok { + ciliumNodeStore.DeleteLocalKey(context.Background(), nodeStub(n.Name)) + } else { + log.Warningf("Unknown CiliumNode object type %s received: %+v", reflect.TypeOf(obj), obj) + } +} + +func synchronizeNodes() { + _, ciliumNodeInformer := informer.NewInformer( + cache.NewListWatchFromClient(ciliumK8sClient.CiliumV2().RESTClient(), + "ciliumnodes", k8sv1.NamespaceAll, fields.Everything()), + &ciliumv2.CiliumNode{}, + 0, + cache.ResourceEventHandlerFuncs{ + AddFunc: updateNode, + UpdateFunc: func(_, newObj interface{}) { + updateNode(newObj) + }, + DeleteFunc: func(obj interface{}) { + deletedObj, ok := obj.(cache.DeletedFinalStateUnknown) + if ok { + deleteNode(deletedObj.Obj) + } else { + deleteNode(obj) + } + }, + }, + k8s.ConvertToCiliumNode, + ) + + go ciliumNodeInformer.Run(wait.NeverStop) +} + +func updateEndpoint(obj interface{}) { + e, ok := obj.(*types.CiliumEndpoint) + if !ok { + log.Warningf("Unknown CiliumEndpoint object type %s received: %+v", reflect.TypeOf(obj), obj) + return + } + + if n := e.Networking; n != nil { + for _, address := range n.Addressing { + for _, ip := range []string{address.IPV4, address.IPV6} { + if ip == "" { + continue + } + + keyPath := path.Join(ipcache.IPIdentitiesPath, ipcache.DefaultAddressSpace, ip) + entry := identity.IPIdentityPair{ + IP: net.ParseIP(ip), + Metadata: "", + HostIP: net.ParseIP(n.NodeIP), + K8sNamespace: e.Namespace, + K8sPodName: e.Name, + } + + if e.Identity != nil { + entry.ID = identity.NumericIdentity(e.Identity.ID) + } + + if e.Encryption != nil { + entry.Key = uint8(e.Encryption.Key) + } + + marshaledEntry, err := json.Marshal(entry) + if err != nil { + log.WithError(err).Warningf("Unable to JSON marshal entry %#v", entry) + continue + } + + _, err = kvstore.Client().UpdateIfDifferent(context.Background(), keyPath, marshaledEntry, true) + if err != nil { + log.WithError(err).Warningf("Unable to update endpoint %s in etcd", keyPath) + } else { + log.Infof("Inserted endpoint into etcd: %v", entry) + } + } + } + } +} + +func deleteEndpoint(obj interface{}) { + e, ok := obj.(*types.CiliumEndpoint) + if !ok { + log.Warningf("Unknown CiliumEndpoint object type %s received: %+v", reflect.TypeOf(obj), obj) + return + } + + if n := e.Networking; n != nil { + for _, address := range n.Addressing { + for _, ip := range []string{address.IPV4, address.IPV6} { + if ip == "" { + continue + } + + keyPath := path.Join(ipcache.IPIdentitiesPath, ipcache.DefaultAddressSpace, ip) + if err := kvstore.Client().Delete(context.Background(), keyPath); err != nil { + log.WithError(err).Warningf("Unable to delete endpoint %s in etcd", keyPath) + } + } + } + } +} + +func synchronizeEndpoints() { + _, ciliumNodeInformer := informer.NewInformer( + cache.NewListWatchFromClient(ciliumK8sClient.CiliumV2().RESTClient(), + "ciliumendpoints", k8sv1.NamespaceAll, fields.Everything()), + &ciliumv2.CiliumEndpoint{}, + 0, + cache.ResourceEventHandlerFuncs{ + AddFunc: updateEndpoint, + UpdateFunc: func(_, newObj interface{}) { + updateEndpoint(newObj) + }, + DeleteFunc: func(obj interface{}) { + deletedObj, ok := obj.(cache.DeletedFinalStateUnknown) + if ok { + deleteEndpoint(deletedObj.Obj) + } else { + deleteEndpoint(obj) + } + }, + }, + k8s.ConvertToCiliumEndpoint, + ) + + go ciliumNodeInformer.Run(wait.NeverStop) +} + +func handleServiceEvent(event k8s.ServiceEvent, store *store.SharedStore) { + svc := k8s.NewClusterService(event.ID, event.Service, event.Endpoints) + svc.Cluster = clusterName + + switch event.Action { + case k8s.UpdateService: + if event.Service.Shared { + store.UpdateLocalKeySync(context.Background(), &svc) + break + } + // The annotation may have been removed, delete an eventual existing service + fallthrough + + case k8s.DeleteService: + store.DeleteLocalKey(context.Background(), &svc) + } +} + +func synchronizeServices() { + updateService := func(obj interface{}) { + svc := k8s.ObjToV1Services(obj) + serviceCache.UpdateService(svc, nil) + } + + deleteService := func(obj interface{}) { + svc := k8s.ObjToV1Services(obj) + serviceCache.DeleteService(svc, nil) + } + + _, serviceInformer := informer.NewInformer( + cache.NewListWatchFromClient(k8s.Client().CoreV1().RESTClient(), + "services", k8sv1.NamespaceAll, fields.Everything()), + &k8sv1.Service{}, + 0, + cache.ResourceEventHandlerFuncs{ + AddFunc: updateService, + UpdateFunc: func(_, newObj interface{}) { + updateService(newObj) + }, + DeleteFunc: func(obj interface{}) { + deletedObj, ok := obj.(cache.DeletedFinalStateUnknown) + if ok { + deleteService(deletedObj.Obj) + } else { + deleteService(obj) + } + }, + }, + k8s.ConvertToK8sService, + ) + + go serviceInformer.Run(wait.NeverStop) +} + +func runServer(cmd *cobra.Command) { + if mockFile == "" { + k8s.Configure("", "", 0.0, 0) + if err := k8s.Init(k8sconfig.NewDefaultConfiguration()); err != nil { + log.WithError(err).Fatal("Unable to connect to Kubernetes apiserver") + } + ciliumK8sClient = k8s.CiliumClient() + } + + mgr := NewVMManager(ciliumK8sClient) + + go startApi() + + if err := kvstore.Setup(context.Background(), "etcd", option.Config.KVStoreOpt, nil); err != nil { + log.WithError(err).Fatal("Unable to connect to etcd") + } + + s, err := store.JoinSharedStore(store.Configuration{ + Prefix: nodeStore.NodeRegisterStorePrefix, + KeyCreator: nodeStore.RegisterKeyCreator, + Observer: mgr, + }) + if err != nil { + log.WithError(err).Fatal("Unable to set up node store in etcd") + } + ciliumNodeRegisterStore = s + + s, err = store.JoinSharedStore(store.Configuration{ + Prefix: nodeStore.NodeStorePrefix, + KeyCreator: nodeStore.KeyCreator, + }) + if err != nil { + log.WithError(err).Fatal("Unable to set up node store in etcd") + } + ciliumNodeStore = s + + s, err = store.JoinSharedStore(store.Configuration{ + Prefix: serviceStore.ServiceStorePrefix, + KeyCreator: func() store.Key { + return &serviceStore.ClusterService{} + }, + }) + if err != nil { + log.WithError(err).Fatal("Unable to setup service store in etcd") + } + ciliumServiceStore = s + + serviceCache = k8s.NewServiceCache(nil) + go func() { + for { + event, ok := <-serviceCache.Events + if !ok { + return + } + handleServiceEvent(event, ciliumServiceStore) + } + }() + + if mockFile != "" { + if err := readMockFile(mockFile); err != nil { + log.WithError(err).Fatal("Unable to read mock file") + } + } else { + synchronizeIdentities() + synchronizeNodes() + synchronizeEndpoints() + synchronizeServices() + } + + go func() { + for { + ctx, cancel := context.WithTimeout(context.Background(), defaults.LockLeaseTTL) + err := kvstore.Client().Update(ctx, kvstore.HeartbeatPath, []byte(time.Now().Format(time.RFC3339)), true) + if err != nil { + log.WithError(err).Warning("Unable to update heartbeat key") + } + cancel() + <-time.After(kvstore.HeartbeatWriteInterval) + } + }() + + log.Info("Initialization complete") + + <-shutdownSignal + log.Info("Received termination signal. Shutting down") + return +} diff --git a/clustermesh-apiserver/test/mock.json b/clustermesh-apiserver/test/mock.json new file mode 100644 index 0000000000000..a989be5ef7d90 --- /dev/null +++ b/clustermesh-apiserver/test/mock.json @@ -0,0 +1,15 @@ +{"kind": "CiliumEndpoint", "metadata": {"name": "pod-1"}, "identity": {"id": 1111, "labels": ["name=id1"]}, "networking": {"addressing": [{"ipv4": "10.1.1.1"}], "node": "1.1.1.1"}, "state": "ready"} +{"kind": "CiliumEndpoint", "metadata": {"name": "pod-2"}, "identity": {"id": 1111, "labels": ["name=id1"]}, "networking": {"addressing": [{"ipv4": "10.1.1.1"}], "node": "1.1.1.1"}, "state": "ready"} +{"kind": "CiliumEndpoint", "metadata": {"name": "pod-3"}, "identity": {"id": 2222, "labels": ["name=id2"]}, "networking": {"addressing": [{"ipv4": "10.1.2.1"}], "node": "1.1.1.2"}, "state": "ready"} +{"kind": "CiliumEndpoint", "metadata": {"name": "pod-4"}, "identity": {"id": 2222, "labels": ["name=id2"]}, "networking": {"addressing": [{"ipv4": "10.1.2.1"}], "node": "1.1.1.2"}, "state": "ready"} +{"kind": "CiliumEndpoint", "metadata": {"name": "pod-5"}, "identity": {"id": 3333, "labels": ["name=id3"]}, "networking": {"addressing": [{"ipv4": "10.1.3.1"}], "node": "1.1.1.3"}, "state": "ready"} +{"kind": "CiliumEndpoint", "metadata": {"name": "pod-6"}, "identity": {"id": 3333, "labels": ["name=id3"]}, "networking": {"addressing": [{"ipv4": "10.1.3.1"}], "node": "1.1.1.3"}, "state": "ready"} +{"kind": "CiliumNode", "metadata": {"name": "node1"}, "spec": {"addresses": [{"type": "InternalIP", "ip": "1.1.1.1"}]}} +{"kind": "CiliumNode", "metadata": {"name": "node2"}, "spec": {"addresses": [{"type": "InternalIP", "ip": "1.1.1.2"}]}} +{"kind": "CiliumNode", "metadata": {"name": "node3"}, "spec": {"addresses": [{"type": "InternalIP", "ip": "1.1.1.3"}]}} +{"kind": "CiliumNode", "metadata": {"name": "node4"}, "spec": {"addresses": [{"type": "InternalIP", "ip": "1.1.1.4"}]}} +{"kind": "CiliumIdentity", "metadata": {"name": "1111"}, "security-labels": {"name": "id1"}} +{"kind": "CiliumIdentity", "metadata": {"name": "2222"}, "security-labels": {"name": "id2"}} +{"kind": "CiliumIdentity", "metadata": {"name": "3333"}, "security-labels": {"name": "id3"}} +{"kind": "Service", "metadata": {"name": "foo", "namespace": "default", "annotations": {"io.cilium/global-service": "true"}}, "spec": {"clusterIP": "20.10.10.10", "ports": [{"name": "https", "port": 443, "protocol": "TCP", "targetPort": 443}], "type": "ClusterIP"}} +{"kind": "Endpoints", "metadata": {"name": "foo", "namespace": "default"}, "subsets": [{"addresses": [{"ip": "34.90.138.138"}], "ports": [{"name": "https", "port": 443, "protocol": "TCP"}]}]} diff --git a/clustermesh-apiserver/test/runtime.yaml b/clustermesh-apiserver/test/runtime.yaml new file mode 100644 index 0000000000000..0ee2400252813 --- /dev/null +++ b/clustermesh-apiserver/test/runtime.yaml @@ -0,0 +1,8 @@ +apiVersion: cilium.io/v2 +kind: CiliumExternalWorkload +metadata: + name: runtime + labels: + app: runtime +spec: + ipv4-alloc-cidr: 10.192.1.0/30 diff --git a/clustermesh-apiserver/test/testccnp.yaml b/clustermesh-apiserver/test/testccnp.yaml new file mode 100644 index 0000000000000..a7c15b29b8ef7 --- /dev/null +++ b/clustermesh-apiserver/test/testccnp.yaml @@ -0,0 +1,17 @@ +apiVersion: cilium.io/v2 +kind: CiliumClusterwideNetworkPolicy +metadata: + name: test-ccnp + namespace: kube-system +spec: + endpointSelector: + matchLabels: + k8s-app: clustermesh-apiserver + ingress: + - fromEndpoints: + - matchLabels: + io.kubernetes.pod.name: runtime + - toPorts: + - ports: + - port: "2379" + protocol: TCP diff --git a/clustermesh-apiserver/tls.rst b/clustermesh-apiserver/tls.rst new file mode 100644 index 0000000000000..789d93f6a39c4 --- /dev/null +++ b/clustermesh-apiserver/tls.rst @@ -0,0 +1,240 @@ +Create TLS Secrets for External Workloads using cilium-certgen +============================================================== + +Create TLS secrets into your k8s cluster: + +.. parsed-literal:: + + $ cilium-certgen --k8s-kubeconfig-path ~/.kube/config + +This takes care of the TLS config on your k8s cluster, but external +workloads need TLS config too. To this end, Cilium Agent in the VM +needs to be configured with the following options: + +.. parsed-literal:: + + --join-cluster --kvstore etcd --kvstore-opt etcd.config=/var/lib/cilium/etcd/config.yaml + +``/var/lib/cilium/etcd/config.yaml`` in the VM needs to contain the following config: + +.. parsed-literal:: + + trusted-ca-file: /var/lib/cilium/etcd/ca.crt + cert-file: /var/lib/cilium/etcd/tls.crt + key-file: /var/lib/cilium/etcd/tls.key + endpoints: + - https://clustermesh-apiserver.cilium.io:32379 + +Certificate files in ``/var/lib/cilium/etcd/`` can be extracted from the k8s secrets created above: + +.. parsed-literal:: + + $ kubectl -n kube-system get secret externalworkload-client-certs -o jsonpath="{.data['ca\.crt']}" | base64 --decode >ca.crt + $ kubectl -n kube-system get secret externalworkload-client-certs -o jsonpath="{.data['tls\.crt']}" | base64 --decode >tls.crt + $ kubectl -n kube-system get secret externalworkload-client-certs -o jsonpath="{.data['tls\.key']}" | base64 --decode >tls.key + +Alternatively, all the secrets can be created manually using openssl as instructed below. + +Finally, add ``clustermesh-apiserver.cilium.io`` into ``/etc/hosts``, +using an externally accessible service IP from your cluster: + +.. parsed-literal:: + + 192.168.36.11 clustermesh-apiserver.cilium.io + +Manual instructions using openssl +================================= + +Create an Internal Certificate Authority (CA) +--------------------------------------------- + +.. parsed-literal:: + + $ openssl req -nodes -new -x509 -keyout VMCA.key -sha256 -days 1825 -out VMCA.crt -subj '/CN=clustermesh-apiserver-ca.cilium.io' + +Generate CA private key named 'VMCA.key': + +.. parsed-literal:: + + $ openssl genrsa -des3 -out VMCA.key 2048 + +Enter any password, just remember it for some of the later steps. + +Generate CA certificate from the private key: + +.. parsed-literal:: + + $ openssl req -x509 -new -nodes -key VMCA.key -sha256 -days 1825 -out VMCA.crt + +The values you enter for each prompt do not need to be any specific value, and do not need to be +accurate. + +Create Private Key and Certificate Signing Request for clustermesh-apiserver +---------------------------------------------------------------------------- + +Generate an internal private key for clustermesh-apiserver + +First create the private key: + +.. parsed-literal:: + + $ openssl genrsa -out clustermesh-apiserver.key 2048 + +Next, create a certificate signing request: + +.. parsed-literal:: + + $ openssl req -new -key clustermesh-apiserver.key -out clustermesh-apiserver.csr -subj '/CN=clustermesh-apiserver.cilium.io' + +.. note:: + + You may need to comment out the ``RANDFILE = $ENV::HOME/.rnd`` line from ``/etc/ssl/openssl.cnf`` for this to work. + + +Use the internal CA private key to create a signed certificate: + +.. parsed-literal:: + + $ openssl x509 -req -days 360 -in clustermesh-apiserver.csr -CA VMCA.crt -CAkey VMCA.key -CAcreateserial \ + -out clustermesh-apiserver.crt -sha256 \ + -extfile <(printf "extendedKeyUsage=clientAuth,serverAuth\nsubjectAltName=DNS:clustermesh-apiserver.cilium.io,IP:127.0.0.1") + +Next we create a Kubernetes secret that includes both the CA certificate, +and private key and signed certificates for clustermesh-apiserver: + +.. parsed-literal:: + + $ kubectl create secret generic externalworkload-server-certs -n kube-system \ + --from-file=ca.crt=VMCA.crt \ + --from-file=tls.crt=clustermesh-apiserver.crt \ + --from-file=tls.key=clustermesh-apiserver.key + +Create Private Key and Certificate Signing Request for clustermesh-apiserver-admin +---------------------------------------------------------------------------------- + +Generate an internal private key for clustermesh-apiserver + +First create the private key: + +.. parsed-literal:: + + $ openssl genrsa -out clustermesh-apiserver-admin.key 2048 + +Next, create a certificate signing request: + +.. parsed-literal:: + + $ openssl req -new -key clustermesh-apiserver-admin.key -out clustermesh-apiserver-admin.csr -subj '/CN=root' + +.. note:: + + You may need to comment out the ``RANDFILE = $ENV::HOME/.rnd`` line from ``/etc/ssl/openssl.cnf`` for this to work. + + +Use the internal CA private key to create a signed certificate: + +.. parsed-literal:: + + $ openssl x509 -req -days 360 -in clustermesh-apiserver-admin.csr -CA VMCA.crt -CAkey VMCA.key -CAcreateserial \ + -out clustermesh-apiserver-admin.crt -sha256 \ + -extfile <(printf "extendedKeyUsage=clientAuth,serverAuth\nsubjectAltName=DNS:localhost") + +Next we create a Kubernetes secret that includes both the CA certificate, +and private key and signed certificates for clustermesh-apiserver-admin: + +.. parsed-literal:: + + $ kubectl create secret generic externalworkload-admin-certs -n kube-system \ + --from-file=ca.crt=VMCA.crt \ + --from-file=tls.crt=clustermesh-apiserver-admin.crt \ + --from-file=tls.key=clustermesh-apiserver-admin.key + +Create Private Key and Certificate for a VM +------------------------------------------- + +Generate an internal private key for VM "runtime" + +First create the private key: + +.. parsed-literal:: + + $ openssl genrsa -out client.key 2048 + +Next, create a certificate signing request: + +.. parsed-literal:: + + $ openssl req -new -key client.key -out client.csr -subj '/CN=externalworkload' + +Use the internal CA private key to create a signed certificate: + +.. parsed-literal:: + + $ openssl x509 -req -days 360 -in client.csr -CA VMCA.crt -CAkey VMCA.key -CAcreateserial \ + -out client.crt -sha256 \ + -extfile <(printf "extendedKeyUsage=clientAuth,serverAuth\nsubjectAltName=DNS:externalworkload") + +Next we store the client certificate as a Kubernetes secret that includes both the CA certificate, +and private key and signed certificates for clustermesh-apiserver-client: + +.. parsed-literal:: + + $ kubectl create secret generic externalworkload-client-certs -n kube-system \ + --from-file=ca.crt=VMCA.crt \ + --from-file=tls.crt=client.crt \ + --from-file=tls.key=client.key + +Configure Cilium agent on the VM +-------------------------------- + +Cilium Agent in the VM needs to be configured with the following options: + +.. parsed-literal:: + + --join-cluster --kvstore etcd --kvstore-opt etcd.config=/var/lib/cilium/etcd/config.yaml + +Create ``/var/lib/cilium/etcd/config.yaml`` with the following contents: + +.. parsed-literal:: + + --- + trusted-ca-file: /var/lib/cilium/etcd/ca.crt + cert-file: /var/lib/cilium/etcd/tls.crt + key-file: /var/lib/cilium/etcd/tls.key + endpoints: + - https://clustermesh-apiserver.cilium.io:32379 + +Place the certificates into ``/var/lib/cilium/etcd`` in the VM: + +.. parsed-literal:: + + $ cp VMCA.crt /var/lib/cilium/etcd/ca.crt + $ cp client.crt /var/lib/cilium/etcd/tls.crt + $ cp client.key /var/lib/cilium/etcd/tls.key + +Finally, add ``clustermesh-apiserver.cilium.io`` into ``/etc/hosts`` using an +externally accessible service IP from your cluster: + +.. parsed-literal:: + + 192.168.36.11 clustermesh-apiserver.ciliumn.io + +Starting Cilium in a Container in a VM +====================================== + +$ docker run -d --name cilium --restart always --privileged --cap-add ALL --log-driver syslog cilium/cilium-dev:testing + + +--volume /home/vagrant/cilium/etcd:/var/lib/cilium/etcd + + +/usr/bin/cilium-agent --kvstore etcd --kvstore-opt etcd.config=/var/lib/cilium/etcd/config.yaml --ipv4-node 192.168.36.10 --join-cluster +sudo mount bpffs -t bpf /sys/fs/bpf + +--add-host clustermesh-apiserver.cilium.io:192.168.36.11 +--network host +--privileged +--cap-add ALL +--name cilium +--rm +--volume /var/lib/cilium/etcd:/var/lib/cilium/etcd diff --git a/clustermesh-apiserver/vmmanager.go b/clustermesh-apiserver/vmmanager.go new file mode 100644 index 0000000000000..55edee12c1471 --- /dev/null +++ b/clustermesh-apiserver/vmmanager.go @@ -0,0 +1,454 @@ +// Copyright 2018-2020 Authors of Cilium +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "encoding/json" + "net" + "sort" + + "github.com/cilium/cilium/api/v1/models" + "github.com/cilium/cilium/pkg/cidr" + "github.com/cilium/cilium/pkg/identity" + identityCache "github.com/cilium/cilium/pkg/identity/cache" + identitymodel "github.com/cilium/cilium/pkg/identity/model" + "github.com/cilium/cilium/pkg/k8s" + k8sConst "github.com/cilium/cilium/pkg/k8s/apis/cilium.io" + ciliumv2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2" + clientset "github.com/cilium/cilium/pkg/k8s/client/clientset/versioned" + "github.com/cilium/cilium/pkg/k8s/informer" + k8sversion "github.com/cilium/cilium/pkg/k8s/version" + "github.com/cilium/cilium/pkg/kvstore/store" + "github.com/cilium/cilium/pkg/labels" + nodeTypes "github.com/cilium/cilium/pkg/node/types" + "github.com/cilium/cilium/pkg/option" + + k8sv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" +) + +type VMManager struct { + ciliumClient clientset.Interface + identityAllocator *identityCache.CachingIdentityAllocator + + ciliumExternalWorkloadStore cache.Store + ciliumExternalWorkloadInformer cache.Controller +} + +func NewVMManager(ciliumK8sClient clientset.Interface) *VMManager { + m := &VMManager{ + ciliumClient: ciliumK8sClient, + } + m.identityAllocator = identityCache.NewCachingIdentityAllocator(m) + + if option.Config.EnableWellKnownIdentities { + identity.InitWellKnownIdentities(option.Config) + } + m.identityAllocator.InitIdentityAllocator(ciliumK8sClient, identityStore) + m.startCiliumExternalWorkloadWatcher() + return m +} + +func (m *VMManager) startCiliumExternalWorkloadWatcher() { + m.ciliumExternalWorkloadStore, m.ciliumExternalWorkloadInformer = informer.NewInformer( + cache.NewListWatchFromClient(ciliumK8sClient.CiliumV2().RESTClient(), + ciliumv2.CEWPluralName, k8sv1.NamespaceAll, fields.Everything()), + &ciliumv2.CiliumExternalWorkload{}, + 0, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if cew, ok := obj.(*ciliumv2.CiliumExternalWorkload); ok { + log.Debugf("Added CEW: %v", cew) + } + }, + UpdateFunc: func(_, newObj interface{}) { + if cew, ok := newObj.(*ciliumv2.CiliumExternalWorkload); ok { + log.Debugf("Updated CEW: %v", cew) + } + }, + DeleteFunc: func(obj interface{}) { + deletedObj, ok := obj.(cache.DeletedFinalStateUnknown) + if ok { + obj = deletedObj.Obj + } + if cew, ok := obj.(*ciliumv2.CiliumExternalWorkload); ok { + log.Debugf("Deleted CEW: %v", cew) + } + }, + }, + k8s.ConvertToCiliumExternalWorkload, + ) + + go m.ciliumExternalWorkloadInformer.Run(wait.NeverStop) +} + +// +// IdentityAllocatorOwner interface +// + +// UpdateIdentities will be called when identities have changed +func (m *VMManager) UpdateIdentities(added, deleted identityCache.IdentityCache) {} + +// GetSuffix must return the node specific suffix to use +func (m *VMManager) GetNodeSuffix() string { + return "vm-allocator" +} + +func nodeOverrideFromCEW(n *nodeTypes.RegisterNode, cew *ciliumv2.CiliumExternalWorkload) *nodeTypes.RegisterNode { + nk := n.DeepCopy() + + nk.Labels = make(map[string]string, len(cew.Labels)) + for k, v := range cew.Labels { + nk.Labels[k] = v + } + + // Default pod name and namespace labels + if nk.Labels[k8sConst.PodNamespaceLabel] == "" { + nk.Labels[k8sConst.PodNamespaceLabel] = "default" + } + if nk.Labels[k8sConst.PodNameLabel] == "" { + nk.Labels[k8sConst.PodNameLabel] = cew.Name + } + + // Override cluster + nk.Cluster = clusterName + nk.ClusterID = clusterID + nk.Labels[k8sConst.PolicyLabelCluster] = clusterName + + // Override CIDRs if defined + if cew.Spec.IPv4AllocCIDR != "" { + if cidr, err := cidr.ParseCIDR(cew.Spec.IPv4AllocCIDR); err == nil { + if ip4 := cidr.IP.To4(); ip4 != nil { + nk.IPv4AllocCIDR = cidr + } else { + log.Warning("CEW: ipv4-alloc-cidr is not IPv4") + } + } else { + log.WithError(err).Warningf("CEW: parse error on %s", cew.Spec.IPv4AllocCIDR) + } + } + if cew.Spec.IPv6AllocCIDR != "" { + if cidr, err := cidr.ParseCIDR(cew.Spec.IPv6AllocCIDR); err == nil { + if ip6 := cidr.IP.To16(); ip6 != nil { + nk.IPv6AllocCIDR = cidr + } else { + log.Warning("CEW: ipv6-alloc-cidr is not IPv6") + } + } else { + log.WithError(err).Warningf("CEW: parse error on %s", cew.Spec.IPv6AllocCIDR) + } + } + return nk +} + +// +// Observer interface +// + +func (m *VMManager) OnUpdate(k store.Key) { + if n, ok := k.(*nodeTypes.RegisterNode); ok { + // Only handle registration events if CiliumExternalWorkload CRD with a matching name exists + cewObj, exists, _ := m.ciliumExternalWorkloadStore.GetByKey(n.Name) + if !exists { + log.Warningf("CEW: CiliumExternalWorkload resource not found for: %v", n) + return + } + cew, ok := cewObj.(*ciliumv2.CiliumExternalWorkload) + if !ok { + log.Errorf("CEW: CiliumExternalWorkload %s not the right type: %T", n.Name, cewObj) + return + } + + if n.NodeIdentity == 0 { + // Phase 1: Initial registration with zero ID, return configuration + nk := nodeOverrideFromCEW(n, cew) + + log.Debugf("CEW: VM Cilium Node updated: %v -> %v", n, nk) + id := m.AllocateNodeIdentity(nk) + if id != nil { + nid := id.ID.Uint32() + nk.NodeIdentity = nid + + // clear addresses so that we know the registration is not ready yet + nk.IPAddresses = nil + + // Update the registration, now with the node identity and overridden fields + if err := ciliumNodeRegisterStore.UpdateKeySync(context.Background(), nk); err != nil { + log.WithError(err).Warning("CEW: Unable to update register node in etcd") + } else { + log.Debugf("CEW: Updated register node in etcd (nid: %d): %v", nid, nk) + } + } + } else if len(n.IPAddresses) > 0 { + // Phase 2: non-zero ID registration with addresses + + // Override again, just in case the extenal node is misbehaving + nk := nodeOverrideFromCEW(n, cew) + + id := m.LookupNodeIdentity(nk) + if id == nil || id.ID.Uint32() != nk.NodeIdentity { + log.Errorf("CEW: Invalid identity %d in %v", nk.NodeIdentity, nk) + } + + // Create cluster resources for the external node + nodeIP := nk.GetNodeIP(false) + m.UpdateCiliumNodeResource(nk) + m.UpdateCiliumEndpointResource(nk.Name, id, nk.IPAddresses, nodeIP) + + nid := id.ID.Uint32() + + // Update CEW with the identity and IP + cewCopy := cew.DeepCopy() + cewCopy.Status.ID = uint64(nid) + cewCopy.Status.IP = nodeIP.String() + for retryCount := 0; retryCount < maxRetryCount; retryCount++ { + if _, err := m.ciliumClient.CiliumV2().CiliumExternalWorkloads().UpdateStatus(context.TODO(), cewCopy, metav1.UpdateOptions{}); err != nil { + if errors.IsConflict(err) { + log.WithError(err).Warn("CEW: Unable to update CiliumExternalWorkload status, will retry") + continue + } + log.WithError(err).Error("CEW: Unable to update CiliumExternalWorkload status") + } else { + log.Debugf("CEW: Successfully updated CiliumExternalWorkload status: %v", *cewCopy) + break + } + } + } + } else { + log.Errorf("CEW: VM Cilium Node not RegisterNode: %v", k) + } +} + +func (m *VMManager) OnDelete(k store.NamedKey) { + log.Debugf("RegisterNode deleted: %v", k.GetKeyName()) +} + +func (m *VMManager) AllocateNodeIdentity(n *nodeTypes.RegisterNode) *identity.Identity { + vmLabels := labels.Map2Labels(n.Labels, "k8s") + + log.Debug("Resolving identity for VM labels") + ctx, cancel := context.WithTimeout(context.TODO(), option.Config.KVstoreConnectivityTimeout) + defer cancel() + + id := m.identityAllocator.LookupIdentity(ctx, vmLabels) + if id != nil { + return id + } + + id, allocated, err := m.identityAllocator.AllocateIdentity(ctx, vmLabels, true) + if err != nil { + log.WithError(err).Error("unable to resolve identity") + } else { + if allocated { + log.Debugf("allocated identity %v", id) + } else { + log.Debugf("identity %v was already allocated", id) + } + } + return id +} + +func (m *VMManager) LookupNodeIdentity(n *nodeTypes.RegisterNode) *identity.Identity { + vmLabels := labels.Map2Labels(n.Labels, "k8s") + + log.Debug("Looking up identity for VM labels") + ctx, cancel := context.WithTimeout(context.TODO(), option.Config.KVstoreConnectivityTimeout) + defer cancel() + + return m.identityAllocator.LookupIdentity(ctx, vmLabels) +} + +const ( + maxRetryCount = 5 +) + +// UpdateCiliumNodeResource updates the CiliumNode resource representing the +// local node +func (m *VMManager) UpdateCiliumNodeResource(n *nodeTypes.RegisterNode) { + nr := n.ToCiliumNode() + + for retryCount := 0; retryCount < maxRetryCount; retryCount++ { + log.Info("Getting CN during an update") + nodeResource, err := m.ciliumClient.CiliumV2().CiliumNodes().Get(context.TODO(), n.Name, metav1.GetOptions{}) + if err != nil { + if _, err = m.ciliumClient.CiliumV2().CiliumNodes().Create(context.TODO(), nr, metav1.CreateOptions{}); err != nil { + if errors.IsConflict(err) { + log.WithError(err).Warn("Unable to create CiliumNode resource, will retry") + continue + } + log.WithError(err).Fatal("Unable to create CiliumNode resource") + } else { + log.Infof("Successfully created CiliumNode resource: %v", *nr) + return + } + } else { + nodeResource.ObjectMeta.Labels = nr.ObjectMeta.Labels + nodeResource.Spec = nr.Spec + if _, err := m.ciliumClient.CiliumV2().CiliumNodes().Update(context.TODO(), nodeResource, metav1.UpdateOptions{}); err != nil { + if errors.IsConflict(err) { + log.WithError(err).Warn("Unable to update CiliumNode resource, will retry") + continue + } + log.WithError(err).Fatal("Unable to update CiliumNode resource") + } else { + log.Infof("Successfully updated CiliumNode resource: %v", *nodeResource) + return + } + } + } + log.Fatal("Could not create or update CiliumNode resource, despite retries") +} + +// UpdateCiliumEndpointResource updates the CiliumNode resource representing the +// local node +func (m *VMManager) UpdateCiliumEndpointResource(name string, id *identity.Identity, ipAddresses []nodeTypes.Address, nodeIP net.IP) { + var addresses []*ciliumv2.AddressPair + i := 0 + for _, addr := range ipAddresses { + if len(addresses) == i { + addresses = append(addresses, &ciliumv2.AddressPair{}) + } + if ipv4 := addr.IP.To4(); ipv4 != nil { + if addresses[i].IPV4 != "" { + addresses = append(addresses, &ciliumv2.AddressPair{}) + i++ + } + addresses[i].IPV4 = ipv4.String() + } else if ipv6 := addr.IP.To16(); ipv6 != nil { + if addresses[i].IPV6 != "" { + addresses = append(addresses, &ciliumv2.AddressPair{}) + i++ + } + addresses[i].IPV6 = ipv6.String() + } + } + + namespace := id.Labels[k8sConst.PodNamespaceLabel].Value + + for retryCount := 0; retryCount < maxRetryCount; retryCount++ { + log.Info("Getting Node during an CEP update") + nr, err := m.ciliumClient.CiliumV2().CiliumNodes().Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + log.WithError(err).Warn("Unable to get CiliumNode resource, will retry") + continue + } + log.Info("Getting CEP during an initialization") + localCEP, err := m.ciliumClient.CiliumV2().CiliumEndpoints(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + cep := &ciliumv2.CiliumEndpoint{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + OwnerReferences: []metav1.OwnerReference{{ + APIVersion: "cilium.io/v2", + Kind: "CiliumNode", + Name: nr.ObjectMeta.Name, + UID: nr.ObjectMeta.UID, + BlockOwnerDeletion: func() *bool { a := true; return &a }(), + }}, + Labels: map[string]string{ + "name": name, + }, + }, + } + if localCEP, err = m.ciliumClient.CiliumV2().CiliumEndpoints(namespace).Create(context.TODO(), cep, metav1.CreateOptions{}); err != nil { + if errors.IsConflict(err) { + log.WithError(err).Warn("Unable to create CiliumEndpoint resource, will retry") + continue + } + log.WithError(err).Fatal("Unable to create CiliumEndpoint resource") + } + js, _ := json.Marshal(cep) + log.Infof("Successfully created CiliumEndpoint resource %s/%s: %s", namespace, name, js) + js, _ = json.Marshal(localCEP) + log.Infof("Returned CiliumEndpoint resource %s/%s: %s", namespace, name, js) + } + + mdl := ciliumv2.EndpointStatus{ + ID: int64(1), + // ExternalIdentifiers: e.getModelEndpointIdentitiersRLocked(), + Identity: getEndpointIdentity(identitymodel.CreateModel(id)), + Networking: &ciliumv2.EndpointNetworking{ + Addressing: addresses, + NodeIP: nodeIP.String(), + }, + State: string(models.EndpointStateReady), // XXX + // Encryption: ciliumv2.EncryptionSpec{Key: int(n.GetIPsecKeyIdentity())}, + // NamedPorts: e.getNamedPortsModel(), + } + + if k8sversion.Capabilities().Patch { + replaceCEPStatus := []k8s.JSONPatch{ + { + OP: "replace", + Path: "/status", + Value: mdl, + }, + } + var createStatusPatch []byte + createStatusPatch, err = json.Marshal(replaceCEPStatus) + if err != nil { + log.WithError(err).Fatalf("json.Marshal(%v) failed", replaceCEPStatus) + } + localCEP, err = m.ciliumClient.CiliumV2().CiliumEndpoints(namespace).Patch(context.TODO(), name, + types.JSONPatchType, createStatusPatch, metav1.PatchOptions{}, "status") + if err != nil { + if errors.IsConflict(err) { + log.WithError(err).Warn("Unable to update CiliumEndpoint resource, will retry") + continue + } + log.WithError(err).Fatal("Unable to update CiliumEndpoint resource") + } else { + log.Infof("Successfully patched CiliumEndpoint resource: %v", *localCEP) + return + } + } else { + localCEP.Status = mdl + localCEP, err = m.ciliumClient.CiliumV2().CiliumEndpoints(namespace).UpdateStatus(context.TODO(), localCEP, metav1.UpdateOptions{}) + if err != nil { + if errors.IsConflict(err) { + log.WithError(err).Warn("Unable to update CiliumEndpoint resource, will retry") + continue + } + log.WithError(err).Fatal("Unable to update CiliumEndpoint resource") + } else { + log.Infof("Successfully updated CiliumEndpoint resource: %v", *localCEP) + return + } + } + } + log.Fatal("Could not create or update CiliumEndpoint resource, despite retries") +} + +func getEndpointIdentity(mdlIdentity *models.Identity) (identity *ciliumv2.EndpointIdentity) { + if mdlIdentity == nil { + return + } + identity = &ciliumv2.EndpointIdentity{ + ID: mdlIdentity.ID, + } + + identity.Labels = make([]string, len(mdlIdentity.Labels)) + copy(identity.Labels, mdlIdentity.Labels) + sort.Strings(identity.Labels) + log.Infof("Got Endpoint Identity: %v", *identity) + return +} diff --git a/contrib/k8s/k8s-extract-clustermesh-nodeport-secret.sh b/contrib/k8s/k8s-extract-clustermesh-nodeport-secret.sh new file mode 100755 index 0000000000000..1ab94e3a1dbed --- /dev/null +++ b/contrib/k8s/k8s-extract-clustermesh-nodeport-secret.sh @@ -0,0 +1,55 @@ +#!/bin/bash +# +# Copyright 2020 Authors of Cilium +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Extract a clustermesh secret from the local cluster to be used in other clusters +set -e + +NAMESPACE=$(kubectl get pod -l k8s-app=clustermesh-apiserver -o jsonpath='{.items[0].metadata.namespace}' --all-namespaces) +NODE_NAME=$(kubectl -n $NAMESPACE get pod -l k8s-app=clustermesh-apiserver -o jsonpath='{.items[0].spec.nodeName}') +NODE_IP=$(kubectl -n $NAMESPACE get node $NODE_NAME -o jsonpath='{.status.addresses[?(@.type=="InternalIP")].address}') +NODE_PORT=$(kubectl -n $NAMESPACE get svc clustermesh-apiserver -o jsonpath='{.spec.ports[0].nodePort}') +CLUSTER_NAME=$(kubectl -n $NAMESPACE get cm cilium-config -o jsonpath='{.data.cluster-name}') +CA_CRT=$(kubectl -n $NAMESPACE get secret clustermesh-apiserver-ca-cert -o jsonpath="{.data['ca\.crt']}") +TLS_CRT=$(kubectl -n $NAMESPACE get secret clustermesh-apiserver-remote-cert -o jsonpath="{.data['tls\.crt']}") +TLS_KEY=$(kubectl -n $NAMESPACE get secret clustermesh-apiserver-remote-cert -o jsonpath="{.data['tls\.key']}") + +define(){ IFS='\n' read -r -d '' ${1} || true; } + +ETCD_CONFIG=`cat <..." + exit 1 +fi + +DATA="" +for file in "$@" +do + DATA+=$(jq -r '.data | to_entries[] | "\"\(.key)\": \"\(.value)\","' $file) +done + +# Remove last comma (smallest suffix matching comma) to make data valid json +DATA=${DATA%,} + +NAMESPACE=$(kubectl get pod -l k8s-app=clustermesh-apiserver -o jsonpath='{.items[0].metadata.namespace}' --all-namespaces) + +cat << EOF | +{ + "apiVersion": "v1", + "kind": "Secret", + "metadata": { + "name": "cilium-clustermesh" + }, + "type": "Opaque", + "data": { + $DATA + } +} +EOF +kubectl -n $NAMESPACE apply -f -