Skip to content

Commit

Permalink
clustermesh-apiserver: Use hive and k8s-client
Browse files Browse the repository at this point in the history
This refactors clustermesh-apiserver to use the k8s-client provided clientset.

The health API server was refactored as a hive cell as it was a low-hanging
fruit.

Signed-off-by: Jussi Maki <[email protected]>
  • Loading branch information
joamaki authored and pchaigno committed Sep 15, 2022
1 parent af61d36 commit 4b4ad4e
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 101 deletions.
74 changes: 74 additions & 0 deletions clustermesh-apiserver/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package main

import (
"context"
"fmt"
"net/http"

"github.com/spf13/pflag"
"go.uber.org/fx"

"github.com/cilium/cilium/pkg/defaults"
"github.com/cilium/cilium/pkg/hive"
k8sClient "github.com/cilium/cilium/pkg/k8s/client"
"github.com/cilium/cilium/pkg/option"
)

type HealthAPIServerConfig struct {
ClusterMeshHealthPort int
}

func (HealthAPIServerConfig) CellFlags(flags *pflag.FlagSet) {
flags.Int(option.ClusterMeshHealthPort, defaults.ClusterMeshHealthPort, "TCP port for ClusterMesh apiserver health API")
}

var healthAPIServerCell = hive.NewCellWithConfig[HealthAPIServerConfig](
"health-api-server",
fx.Provide(newHealthAPIServer),
fx.Invoke(func(HealthAPIServer) {}), // Always instantiate.
)

type HealthAPIServer struct {
*http.Server
}

func newHealthAPIServer(lc fx.Lifecycle, clientset k8sClient.Clientset, cfg HealthAPIServerConfig) HealthAPIServer {
mux := http.NewServeMux()

mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
statusCode := http.StatusOK
reply := "ok"

if _, err := clientset.Discovery().ServerVersion(); err != nil {
statusCode = http.StatusInternalServerError
reply = err.Error()
}
w.WriteHeader(statusCode)
if _, err := w.Write([]byte(reply)); err != nil {
log.WithError(err).Error("Failed to respond to /healthz request")
}
})

srv := &http.Server{
Handler: mux,
Addr: fmt.Sprintf(":%d", cfg.ClusterMeshHealthPort),
}

lc.Append(fx.Hook{
OnStart: func(context.Context) error {
go func() {
log.Info("Started health API")
if err := srv.ListenAndServe(); err != nil {
log.WithError(err).Fatalf("Unable to start health API")
}
}()
return nil
},
OnStop: srv.Shutdown,
})

return HealthAPIServer{srv}
}
142 changes: 47 additions & 95 deletions clustermesh-apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,36 +11,35 @@ import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
"os"
"os/signal"
"path"
"reflect"
"strings"
"time"

gops "github.com/google/gops/agent"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"golang.org/x/sys/unix"
"go.uber.org/fx"
k8sv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"

operatorWatchers "github.com/cilium/cilium/operator/watchers"
"github.com/cilium/cilium/pkg/defaults"
"github.com/cilium/cilium/pkg/gops"
"github.com/cilium/cilium/pkg/hive"
"github.com/cilium/cilium/pkg/identity"
identityCache "github.com/cilium/cilium/pkg/identity/cache"
"github.com/cilium/cilium/pkg/inctimer"
"github.com/cilium/cilium/pkg/ipcache"
"github.com/cilium/cilium/pkg/k8s"
ciliumv2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
clientset "github.com/cilium/cilium/pkg/k8s/client/clientset/versioned"
k8sconfig "github.com/cilium/cilium/pkg/k8s/config"
k8sClient "github.com/cilium/cilium/pkg/k8s/client"
"github.com/cilium/cilium/pkg/k8s/informer"
slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1"
"github.com/cilium/cilium/pkg/k8s/synced"
Expand Down Expand Up @@ -73,22 +72,13 @@ var (

log = logging.DefaultLogger.WithField(logfields.LogSubsys, "clustermesh-apiserver")

rootHive *hive.Hive

rootCmd = &cobra.Command{
Use: "clustermesh-apiserver",
Short: "Run the ClusterMesh apiserver",
Run: func(cmd *cobra.Command, args []string) {
// Open socket for using gops to get stacktraces of the agent.
addr := fmt.Sprintf("127.0.0.1:%d", vp.GetInt(option.GopsPort))
addrField := logrus.Fields{"address": addr}
if err := gops.Listen(gops.Options{
Addr: addr,
ReuseSocketAddrAndPort: true,
}); err != nil {
log.WithError(err).WithFields(addrField).Fatal("Cannot start gops server")
}
log.WithFields(addrField).Info("Started gops server")

runServer(cmd)
rootHive.Run()
},
PreRun: func(cmd *cobra.Command, args []string) {
option.Config.Populate(vp)
Expand All @@ -99,26 +89,40 @@ var (
},
}

mockFile string
clusterID uint32
ciliumK8sClient clientset.Interface
cfg configuration

shutdownSignal = make(chan struct{})
mockFile string
clusterID uint32
cfg configuration

ciliumNodeStore *store.SharedStore

identityStore = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
)

func installSigHandler() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, unix.SIGINT, unix.SIGTERM)
func init() {
rootHive = hive.New(
vp, rootCmd.Flags(),

go func() {
<-signals
close(shutdownSignal)
}()
gops.Cell,
k8sClient.Cell,
healthAPIServerCell,

hive.Invoke(registerHooks),
)
}

func registerHooks(lc fx.Lifecycle, clientset k8sClient.Clientset) error {
if !clientset.IsEnabled() {
return errors.New("Kubernetes client not configured, cannot continue.")
}

k8s.SetClients(clientset, clientset.Slim(), clientset, clientset)
lc.Append(fx.Hook{
OnStart: func(context.Context) error {
startServer(clientset)
return nil
},
})
return nil
}

func readMockFile(path string) error {
Expand Down Expand Up @@ -190,9 +194,6 @@ func runApiserver() error {
flags.BoolP(option.DebugArg, "D", false, "Enable debugging mode")
option.BindEnv(vp, option.DebugArg)

flags.Int(option.GopsPort, defaults.GopsPortApiserver, "Port for gops server to listen on")
option.BindEnv(vp, option.GopsPort)

flags.Duration(option.CRDWaitTimeout, 5*time.Minute, "Cilium will exit if CRDs are not available within this duration upon startup")
option.BindEnv(vp, option.CRDWaitTimeout)

Expand All @@ -205,12 +206,6 @@ func runApiserver() error {
flags.StringVar(&cfg.clusterName, option.ClusterName, "default", "Cluster name")
option.BindEnv(vp, option.ClusterName)

flags.String(option.K8sKubeConfigPath, "", "Absolute path of the kubernetes kubeconfig file")
option.BindEnv(vp, option.K8sKubeConfigPath)

flags.Int(option.ClusterMeshHealthPort, defaults.ClusterMeshHealthPort, "TCP port for ClusterMesh apiserver health API")
option.BindEnv(vp, option.ClusterMeshHealthPort)

flags.StringVar(&mockFile, "mock-file", "", "Read from mock file")

flags.Duration(option.KVstoreConnectivityTimeout, defaults.KVstoreConnectivityTimeout, "Time after which an incomplete kvstore operation is considered failed")
Expand Down Expand Up @@ -249,44 +244,11 @@ func runApiserver() error {
}

func main() {
installSigHandler()

if err := runApiserver(); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}

func startApi() {
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
statusCode := http.StatusOK
reply := "ok"

if _, err := k8s.Client().Discovery().ServerVersion(); err != nil {
statusCode = http.StatusInternalServerError
reply = err.Error()
}
w.WriteHeader(statusCode)
if _, err := w.Write([]byte(reply)); err != nil {
log.WithError(err).Error("Failed to respond to /healthz request")
}
})

srv := &http.Server{Addr: fmt.Sprintf(":%d", option.Config.ClusterMeshHealthPort)}

go func() {
if err := srv.ListenAndServe(); err != nil {
log.WithError(err).Fatalf("Unable to start health API")
}

<-shutdownSignal
if err := srv.Shutdown(context.Background()); err != nil {
log.WithError(err).Error("Unable to shutdown health API")
}
}()
log.Info("Started health API")
}

func parseLabelArrayFromMap(base map[string]string) labels.LabelArray {
array := make(labels.LabelArray, 0, len(base))
for sourceAndKey, value := range base {
Expand Down Expand Up @@ -347,9 +309,9 @@ func deleteIdentity(obj interface{}) {
}
}

func synchronizeIdentities() {
func synchronizeIdentities(clientset k8sClient.Clientset) {
identityInformer := informer.NewInformerWithStore(
cache.NewListWatchFromClient(ciliumK8sClient.CiliumV2().RESTClient(),
cache.NewListWatchFromClient(clientset.CiliumV2().RESTClient(),
"ciliumidentities", k8sv1.NamespaceAll, fields.Everything()),
&ciliumv2.CiliumIdentity{},
0,
Expand Down Expand Up @@ -411,9 +373,9 @@ func deleteNode(obj interface{}) {
}
}

func synchronizeNodes() {
func synchronizeNodes(clientset k8sClient.Clientset) {
_, ciliumNodeInformer := informer.NewInformer(
cache.NewListWatchFromClient(ciliumK8sClient.CiliumV2().RESTClient(),
cache.NewListWatchFromClient(clientset.CiliumV2().RESTClient(),
"ciliumnodes", k8sv1.NamespaceAll, fields.Everything()),
&ciliumv2.CiliumNode{},
0,
Expand Down Expand Up @@ -535,9 +497,9 @@ func deleteEndpoint(obj interface{}) {
}
}

func synchronizeCiliumEndpoints() {
func synchronizeCiliumEndpoints(clientset k8sClient.Clientset) {
_, ciliumEndpointsInformer := informer.NewInformer(
cache.NewListWatchFromClient(ciliumK8sClient.CiliumV2().RESTClient(),
cache.NewListWatchFromClient(clientset.CiliumV2().RESTClient(),
"ciliumendpoints", k8sv1.NamespaceAll, fields.Everything()),
&ciliumv2.CiliumEndpoint{},
0,
Expand Down Expand Up @@ -578,24 +540,17 @@ func synchronizeCiliumEndpoints() {
go ciliumEndpointsInformer.Run(wait.NeverStop)
}

func runServer(cmd *cobra.Command) {
func startServer(clientset k8sClient.Clientset) {
log.WithFields(logrus.Fields{
"cluster-name": cfg.clusterName,
"cluster-id": clusterID,
}).Info("Starting clustermesh-apiserver...")

if mockFile == "" {
k8s.Configure("", viper.GetString(option.K8sKubeConfigPath), 0.0, 0)
if err := k8s.Init(k8sconfig.NewDefaultConfiguration()); err != nil {
log.WithError(err).Fatal("Unable to connect to Kubernetes apiserver")
}
synced.SyncCRDs(context.TODO(), synced.AllCRDResourceNames(), &synced.Resources{}, &synced.APIGroups{})
ciliumK8sClient = k8s.CiliumClient()
}

mgr := NewVMManager(ciliumK8sClient)

go startApi()
mgr := NewVMManager(clientset)

var err error
if err = kvstore.Setup(context.Background(), "etcd", option.Config.KVStoreOpt, nil); err != nil {
Expand Down Expand Up @@ -624,10 +579,10 @@ func runServer(cmd *cobra.Command) {
log.WithError(err).Fatal("Unable to read mock file")
}
} else {
synchronizeIdentities()
synchronizeNodes()
synchronizeCiliumEndpoints()
operatorWatchers.StartSynchronizingServices(false, cfg)
synchronizeIdentities(clientset)
synchronizeNodes(clientset)
synchronizeCiliumEndpoints(clientset)
operatorWatchers.StartSynchronizingServices(clientset, false, cfg)
}

go func() {
Expand All @@ -645,7 +600,4 @@ func runServer(cmd *cobra.Command) {
}()

log.Info("Initialization complete")

<-shutdownSignal
log.Info("Received termination signal. Shutting down")
}
13 changes: 7 additions & 6 deletions clustermesh-apiserver/vmmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cilium/cilium/pkg/k8s"
k8sConst "github.com/cilium/cilium/pkg/k8s/apis/cilium.io"
ciliumv2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
k8sClient "github.com/cilium/cilium/pkg/k8s/client"
clientset "github.com/cilium/cilium/pkg/k8s/client/clientset/versioned"
"github.com/cilium/cilium/pkg/k8s/informer"
"github.com/cilium/cilium/pkg/kvstore"
Expand All @@ -44,23 +45,23 @@ type VMManager struct {
ciliumExternalWorkloadInformer cache.Controller
}

func NewVMManager(ciliumK8sClient clientset.Interface) *VMManager {
func NewVMManager(clientset k8sClient.Clientset) *VMManager {
m := &VMManager{
ciliumClient: ciliumK8sClient,
ciliumClient: clientset,
}
m.identityAllocator = identityCache.NewCachingIdentityAllocator(m)

if option.Config.EnableWellKnownIdentities {
identity.InitWellKnownIdentities(option.Config)
}
m.identityAllocator.InitIdentityAllocator(ciliumK8sClient, identityStore)
m.startCiliumExternalWorkloadWatcher()
m.identityAllocator.InitIdentityAllocator(clientset, identityStore)
m.startCiliumExternalWorkloadWatcher(clientset)
return m
}

func (m *VMManager) startCiliumExternalWorkloadWatcher() {
func (m *VMManager) startCiliumExternalWorkloadWatcher(clientset k8sClient.Clientset) {
m.ciliumExternalWorkloadStore, m.ciliumExternalWorkloadInformer = informer.NewInformer(
cache.NewListWatchFromClient(ciliumK8sClient.CiliumV2().RESTClient(),
cache.NewListWatchFromClient(clientset.CiliumV2().RESTClient(),
ciliumv2.CEWPluralName, k8sv1.NamespaceAll, fields.Everything()),
&ciliumv2.CiliumExternalWorkload{},
0,
Expand Down

0 comments on commit 4b4ad4e

Please sign in to comment.