diff --git a/clustermesh-apiserver/health.go b/clustermesh-apiserver/health.go new file mode 100644 index 0000000000000..a5b5e3bd2c6ca --- /dev/null +++ b/clustermesh-apiserver/health.go @@ -0,0 +1,74 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Authors of Cilium + +package main + +import ( + "context" + "fmt" + "net/http" + + "github.com/spf13/pflag" + "go.uber.org/fx" + + "github.com/cilium/cilium/pkg/defaults" + "github.com/cilium/cilium/pkg/hive" + k8sClient "github.com/cilium/cilium/pkg/k8s/client" + "github.com/cilium/cilium/pkg/option" +) + +type HealthAPIServerConfig struct { + ClusterMeshHealthPort int +} + +func (HealthAPIServerConfig) CellFlags(flags *pflag.FlagSet) { + flags.Int(option.ClusterMeshHealthPort, defaults.ClusterMeshHealthPort, "TCP port for ClusterMesh apiserver health API") +} + +var healthAPIServerCell = hive.NewCellWithConfig[HealthAPIServerConfig]( + "health-api-server", + fx.Provide(newHealthAPIServer), + fx.Invoke(func(HealthAPIServer) {}), // Always instantiate. +) + +type HealthAPIServer struct { + *http.Server +} + +func newHealthAPIServer(lc fx.Lifecycle, clientset k8sClient.Clientset, cfg HealthAPIServerConfig) HealthAPIServer { + mux := http.NewServeMux() + + mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { + statusCode := http.StatusOK + reply := "ok" + + if _, err := clientset.Discovery().ServerVersion(); err != nil { + statusCode = http.StatusInternalServerError + reply = err.Error() + } + w.WriteHeader(statusCode) + if _, err := w.Write([]byte(reply)); err != nil { + log.WithError(err).Error("Failed to respond to /healthz request") + } + }) + + srv := &http.Server{ + Handler: mux, + Addr: fmt.Sprintf(":%d", cfg.ClusterMeshHealthPort), + } + + lc.Append(fx.Hook{ + OnStart: func(context.Context) error { + go func() { + log.Info("Started health API") + if err := srv.ListenAndServe(); err != nil { + log.WithError(err).Fatalf("Unable to start health API") + } + }() + return nil + }, + OnStop: srv.Shutdown, + }) + + return HealthAPIServer{srv} +} diff --git a/clustermesh-apiserver/main.go b/clustermesh-apiserver/main.go index 5916088813a48..a174a82c0641a 100644 --- a/clustermesh-apiserver/main.go +++ b/clustermesh-apiserver/main.go @@ -11,21 +11,19 @@ import ( "bufio" "context" "encoding/json" + "errors" "fmt" "net" - "net/http" "os" - "os/signal" "path" "reflect" "strings" "time" - gops "github.com/google/gops/agent" "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" - "golang.org/x/sys/unix" + "go.uber.org/fx" k8sv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/util/wait" @@ -33,14 +31,15 @@ import ( operatorWatchers "github.com/cilium/cilium/operator/watchers" "github.com/cilium/cilium/pkg/defaults" + "github.com/cilium/cilium/pkg/gops" + "github.com/cilium/cilium/pkg/hive" "github.com/cilium/cilium/pkg/identity" identityCache "github.com/cilium/cilium/pkg/identity/cache" "github.com/cilium/cilium/pkg/inctimer" "github.com/cilium/cilium/pkg/ipcache" "github.com/cilium/cilium/pkg/k8s" ciliumv2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2" - clientset "github.com/cilium/cilium/pkg/k8s/client/clientset/versioned" - k8sconfig "github.com/cilium/cilium/pkg/k8s/config" + k8sClient "github.com/cilium/cilium/pkg/k8s/client" "github.com/cilium/cilium/pkg/k8s/informer" slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1" "github.com/cilium/cilium/pkg/k8s/synced" @@ -73,22 +72,13 @@ var ( log = logging.DefaultLogger.WithField(logfields.LogSubsys, "clustermesh-apiserver") + rootHive *hive.Hive + rootCmd = &cobra.Command{ Use: "clustermesh-apiserver", Short: "Run the ClusterMesh apiserver", Run: func(cmd *cobra.Command, args []string) { - // Open socket for using gops to get stacktraces of the agent. - addr := fmt.Sprintf("127.0.0.1:%d", vp.GetInt(option.GopsPort)) - addrField := logrus.Fields{"address": addr} - if err := gops.Listen(gops.Options{ - Addr: addr, - ReuseSocketAddrAndPort: true, - }); err != nil { - log.WithError(err).WithFields(addrField).Fatal("Cannot start gops server") - } - log.WithFields(addrField).Info("Started gops server") - - runServer(cmd) + rootHive.Run() }, PreRun: func(cmd *cobra.Command, args []string) { option.Config.Populate(vp) @@ -99,26 +89,40 @@ var ( }, } - mockFile string - clusterID uint32 - ciliumK8sClient clientset.Interface - cfg configuration - - shutdownSignal = make(chan struct{}) + mockFile string + clusterID uint32 + cfg configuration ciliumNodeStore *store.SharedStore identityStore = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc) ) -func installSigHandler() { - signals := make(chan os.Signal, 1) - signal.Notify(signals, unix.SIGINT, unix.SIGTERM) +func init() { + rootHive = hive.New( + vp, rootCmd.Flags(), - go func() { - <-signals - close(shutdownSignal) - }() + gops.Cell, + k8sClient.Cell, + healthAPIServerCell, + + hive.Invoke(registerHooks), + ) +} + +func registerHooks(lc fx.Lifecycle, clientset k8sClient.Clientset) error { + if !clientset.IsEnabled() { + return errors.New("Kubernetes client not configured, cannot continue.") + } + + k8s.SetClients(clientset, clientset.Slim(), clientset, clientset) + lc.Append(fx.Hook{ + OnStart: func(context.Context) error { + startServer(clientset) + return nil + }, + }) + return nil } func readMockFile(path string) error { @@ -190,9 +194,6 @@ func runApiserver() error { flags.BoolP(option.DebugArg, "D", false, "Enable debugging mode") option.BindEnv(vp, option.DebugArg) - flags.Int(option.GopsPort, defaults.GopsPortApiserver, "Port for gops server to listen on") - option.BindEnv(vp, option.GopsPort) - flags.Duration(option.CRDWaitTimeout, 5*time.Minute, "Cilium will exit if CRDs are not available within this duration upon startup") option.BindEnv(vp, option.CRDWaitTimeout) @@ -205,12 +206,6 @@ func runApiserver() error { flags.StringVar(&cfg.clusterName, option.ClusterName, "default", "Cluster name") option.BindEnv(vp, option.ClusterName) - flags.String(option.K8sKubeConfigPath, "", "Absolute path of the kubernetes kubeconfig file") - option.BindEnv(vp, option.K8sKubeConfigPath) - - flags.Int(option.ClusterMeshHealthPort, defaults.ClusterMeshHealthPort, "TCP port for ClusterMesh apiserver health API") - option.BindEnv(vp, option.ClusterMeshHealthPort) - flags.StringVar(&mockFile, "mock-file", "", "Read from mock file") flags.Duration(option.KVstoreConnectivityTimeout, defaults.KVstoreConnectivityTimeout, "Time after which an incomplete kvstore operation is considered failed") @@ -249,44 +244,11 @@ func runApiserver() error { } func main() { - installSigHandler() - if err := runApiserver(); err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } } - -func startApi() { - http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { - statusCode := http.StatusOK - reply := "ok" - - if _, err := k8s.Client().Discovery().ServerVersion(); err != nil { - statusCode = http.StatusInternalServerError - reply = err.Error() - } - w.WriteHeader(statusCode) - if _, err := w.Write([]byte(reply)); err != nil { - log.WithError(err).Error("Failed to respond to /healthz request") - } - }) - - srv := &http.Server{Addr: fmt.Sprintf(":%d", option.Config.ClusterMeshHealthPort)} - - go func() { - if err := srv.ListenAndServe(); err != nil { - log.WithError(err).Fatalf("Unable to start health API") - } - - <-shutdownSignal - if err := srv.Shutdown(context.Background()); err != nil { - log.WithError(err).Error("Unable to shutdown health API") - } - }() - log.Info("Started health API") -} - func parseLabelArrayFromMap(base map[string]string) labels.LabelArray { array := make(labels.LabelArray, 0, len(base)) for sourceAndKey, value := range base { @@ -347,9 +309,9 @@ func deleteIdentity(obj interface{}) { } } -func synchronizeIdentities() { +func synchronizeIdentities(clientset k8sClient.Clientset) { identityInformer := informer.NewInformerWithStore( - cache.NewListWatchFromClient(ciliumK8sClient.CiliumV2().RESTClient(), + cache.NewListWatchFromClient(clientset.CiliumV2().RESTClient(), "ciliumidentities", k8sv1.NamespaceAll, fields.Everything()), &ciliumv2.CiliumIdentity{}, 0, @@ -411,9 +373,9 @@ func deleteNode(obj interface{}) { } } -func synchronizeNodes() { +func synchronizeNodes(clientset k8sClient.Clientset) { _, ciliumNodeInformer := informer.NewInformer( - cache.NewListWatchFromClient(ciliumK8sClient.CiliumV2().RESTClient(), + cache.NewListWatchFromClient(clientset.CiliumV2().RESTClient(), "ciliumnodes", k8sv1.NamespaceAll, fields.Everything()), &ciliumv2.CiliumNode{}, 0, @@ -535,9 +497,9 @@ func deleteEndpoint(obj interface{}) { } } -func synchronizeCiliumEndpoints() { +func synchronizeCiliumEndpoints(clientset k8sClient.Clientset) { _, ciliumEndpointsInformer := informer.NewInformer( - cache.NewListWatchFromClient(ciliumK8sClient.CiliumV2().RESTClient(), + cache.NewListWatchFromClient(clientset.CiliumV2().RESTClient(), "ciliumendpoints", k8sv1.NamespaceAll, fields.Everything()), &ciliumv2.CiliumEndpoint{}, 0, @@ -578,24 +540,17 @@ func synchronizeCiliumEndpoints() { go ciliumEndpointsInformer.Run(wait.NeverStop) } -func runServer(cmd *cobra.Command) { +func startServer(clientset k8sClient.Clientset) { log.WithFields(logrus.Fields{ "cluster-name": cfg.clusterName, "cluster-id": clusterID, }).Info("Starting clustermesh-apiserver...") if mockFile == "" { - k8s.Configure("", viper.GetString(option.K8sKubeConfigPath), 0.0, 0) - if err := k8s.Init(k8sconfig.NewDefaultConfiguration()); err != nil { - log.WithError(err).Fatal("Unable to connect to Kubernetes apiserver") - } synced.SyncCRDs(context.TODO(), synced.AllCRDResourceNames(), &synced.Resources{}, &synced.APIGroups{}) - ciliumK8sClient = k8s.CiliumClient() } - mgr := NewVMManager(ciliumK8sClient) - - go startApi() + mgr := NewVMManager(clientset) var err error if err = kvstore.Setup(context.Background(), "etcd", option.Config.KVStoreOpt, nil); err != nil { @@ -624,10 +579,10 @@ func runServer(cmd *cobra.Command) { log.WithError(err).Fatal("Unable to read mock file") } } else { - synchronizeIdentities() - synchronizeNodes() - synchronizeCiliumEndpoints() - operatorWatchers.StartSynchronizingServices(false, cfg) + synchronizeIdentities(clientset) + synchronizeNodes(clientset) + synchronizeCiliumEndpoints(clientset) + operatorWatchers.StartSynchronizingServices(clientset, false, cfg) } go func() { @@ -645,7 +600,4 @@ func runServer(cmd *cobra.Command) { }() log.Info("Initialization complete") - - <-shutdownSignal - log.Info("Received termination signal. Shutting down") } diff --git a/clustermesh-apiserver/vmmanager.go b/clustermesh-apiserver/vmmanager.go index 648afffaca9c6..3be155a157611 100644 --- a/clustermesh-apiserver/vmmanager.go +++ b/clustermesh-apiserver/vmmanager.go @@ -26,6 +26,7 @@ import ( "github.com/cilium/cilium/pkg/k8s" k8sConst "github.com/cilium/cilium/pkg/k8s/apis/cilium.io" ciliumv2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2" + k8sClient "github.com/cilium/cilium/pkg/k8s/client" clientset "github.com/cilium/cilium/pkg/k8s/client/clientset/versioned" "github.com/cilium/cilium/pkg/k8s/informer" "github.com/cilium/cilium/pkg/kvstore" @@ -44,23 +45,23 @@ type VMManager struct { ciliumExternalWorkloadInformer cache.Controller } -func NewVMManager(ciliumK8sClient clientset.Interface) *VMManager { +func NewVMManager(clientset k8sClient.Clientset) *VMManager { m := &VMManager{ - ciliumClient: ciliumK8sClient, + ciliumClient: clientset, } m.identityAllocator = identityCache.NewCachingIdentityAllocator(m) if option.Config.EnableWellKnownIdentities { identity.InitWellKnownIdentities(option.Config) } - m.identityAllocator.InitIdentityAllocator(ciliumK8sClient, identityStore) - m.startCiliumExternalWorkloadWatcher() + m.identityAllocator.InitIdentityAllocator(clientset, identityStore) + m.startCiliumExternalWorkloadWatcher(clientset) return m } -func (m *VMManager) startCiliumExternalWorkloadWatcher() { +func (m *VMManager) startCiliumExternalWorkloadWatcher(clientset k8sClient.Clientset) { m.ciliumExternalWorkloadStore, m.ciliumExternalWorkloadInformer = informer.NewInformer( - cache.NewListWatchFromClient(ciliumK8sClient.CiliumV2().RESTClient(), + cache.NewListWatchFromClient(clientset.CiliumV2().RESTClient(), ciliumv2.CEWPluralName, k8sv1.NamespaceAll, fields.Everything()), &ciliumv2.CiliumExternalWorkload{}, 0,