Skip to content

Commit

Permalink
Move the kubelet and config outside of pkg
Browse files Browse the repository at this point in the history
Signed-off-by: galal-hussein <[email protected]>
  • Loading branch information
galal-hussein committed Oct 15, 2024
1 parent 6b02fd9 commit 4035566
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 115 deletions.
72 changes: 72 additions & 0 deletions k3k-kubelet/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package main

import (
"errors"
"os"

"gopkg.in/yaml.v2"
)

// Config has all virtual-kubelet startup options
type config struct {
clusterName string `yaml:"clusterName"`
clusterNamespace string `yaml:"clusterNamespace"`
hostConfigPath string `yaml:"hostConfigPath"`
virtualConfigPath string `yaml:"virtualConfigPath"`
kubeletPort string `yaml:"kubeletPort"`
nodeName string `yaml:"nodeName"`
agentPodIP string `yaml:"agentPodIP"`
token string `yaml:"token"`
}

func (t *config) UnmarshalYAML(data []byte) error {
var c config
if err := yaml.Unmarshal(data, &c); err != nil {
return err
}
if t.clusterName == "" {
t.clusterName = c.clusterName
}
if t.clusterNamespace == "" {
t.clusterNamespace = c.clusterNamespace
}
if t.hostConfigPath == "" {
t.hostConfigPath = c.hostConfigPath
}
if t.virtualConfigPath == "" {
t.virtualConfigPath = c.virtualConfigPath
}
if t.kubeletPort == "" {
t.kubeletPort = c.kubeletPort
}
if t.nodeName == "" {
t.nodeName = c.nodeName
}

return nil
}

func (t *config) Validate() error {
if t.clusterName == "" {
return errors.New("cluster name is not provided")
}
if t.clusterNamespace == "" {
return errors.New("cluster namespace is not provided")
}
if t.agentPodIP == "" {
return errors.New("agent POD IP is not provided")
}
return nil
}

func (t *config) Parse(path string) error {
if _, err := os.Stat(path); os.IsNotExist(err) {
return nil
}

configFileBytes, err := os.ReadFile(path)
if err != nil {
return err
}
return t.UnmarshalYAML(configFileBytes)
}
72 changes: 0 additions & 72 deletions k3k-kubelet/config/config.go

This file was deleted.

59 changes: 28 additions & 31 deletions k3k-kubelet/kubelet/kubelet.go → k3k-kubelet/kubelet.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package kubelet
package main

import (
"context"
Expand All @@ -11,7 +11,6 @@ import (
"time"

certutil "github.com/rancher/dynamiclistener/cert"
"github.com/rancher/k3k/k3k-kubelet/config"
"github.com/rancher/k3k/k3k-kubelet/provider"
"github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1"
"github.com/rancher/k3k/pkg/controller/cluster/server/bootstrap"
Expand Down Expand Up @@ -49,19 +48,17 @@ func init() {
_ = v1alpha1.AddToScheme(Scheme)
}

type Kubelet struct {
Name string
ServerName string
Port int
TLSConfig *tls.Config
HostConfig *rest.Config
HostClient ctrlruntimeclient.Client
VirtClient kubernetes.Interface
Node *nodeutil.Node
type kubelet struct {
name string
port int
hostConfig *rest.Config
hostClient ctrlruntimeclient.Client
virtClient kubernetes.Interface
node *nodeutil.Node
}

func New(c *config.Config) (*Kubelet, error) {
hostConfig, err := clientcmd.BuildConfigFromFlags("", c.HostConfigPath)
func newKubelet(c *config) (*kubelet, error) {
hostConfig, err := clientcmd.BuildConfigFromFlags("", c.hostConfigPath)
if err != nil {
return nil, err
}
Expand All @@ -73,7 +70,7 @@ func New(c *config.Config) (*Kubelet, error) {
return nil, err
}

virtConfig, err := virtRestConfig(context.Background(), c.VirtualConfigPath, hostClient, c.ClusterName, c.ClusterNamespace)
virtConfig, err := virtRestConfig(context.Background(), c.virtualConfigPath, hostClient, c.clusterName, c.clusterNamespace)
if err != nil {
return nil, err
}
Expand All @@ -82,27 +79,27 @@ func New(c *config.Config) (*Kubelet, error) {
if err != nil {
return nil, err
}
return &Kubelet{
Name: c.NodeName,
HostConfig: hostConfig,
HostClient: hostClient,
VirtClient: virtClient,
return &kubelet{
name: c.nodeName,
hostConfig: hostConfig,
hostClient: hostClient,
virtClient: virtClient,
}, nil
}

func (k *Kubelet) RegisterNode(srvPort, namespace, name, podIP string) error {
func (k *kubelet) RegisterNode(srvPort, namespace, name, podIP string) error {
providerFunc := k.newProviderFunc(namespace, name, podIP)
nodeOpts := k.nodeOpts(srvPort, namespace, name, podIP)

var err error
k.Node, err = nodeutil.NewNode(k.Name, providerFunc, nodeutil.WithClient(k.VirtClient), nodeOpts)
k.node, err = nodeutil.NewNode(k.name, providerFunc, nodeutil.WithClient(k.virtClient), nodeOpts)
if err != nil {
return fmt.Errorf("unable to start kubelet: %v", err)
}
return nil
}

func (k *Kubelet) Start(ctx context.Context) {
func (k *kubelet) Start(ctx context.Context) {
go func() {
ctx := context.Background()
logger, err := zap.NewProduction()
Expand All @@ -114,38 +111,38 @@ func (k *Kubelet) Start(ctx context.Context) {
*logger.Sugar(),
}
ctx = log.WithLogger(ctx, &wrapped)
err = k.Node.Run(ctx)
err = k.node.Run(ctx)
if err != nil {
fmt.Printf("node errored when running: %s \n", err.Error())
os.Exit(-1)
}
}()
if err := k.Node.WaitReady(context.Background(), time.Minute*1); err != nil {
if err := k.node.WaitReady(context.Background(), time.Minute*1); err != nil {
fmt.Printf("node was not ready within timeout of 1 minute: %s \n", err.Error())
os.Exit(-1)
}
<-k.Node.Done()
if err := k.Node.Err(); err != nil {
<-k.node.Done()
if err := k.node.Err(); err != nil {
fmt.Printf("node stopped with an error: %s \n", err.Error())
os.Exit(-1)
}
fmt.Printf("node exited without an error")
}

func (k *Kubelet) newProviderFunc(namespace, name, podIP string) nodeutil.NewProviderFunc {
func (k *kubelet) newProviderFunc(namespace, name, podIP string) nodeutil.NewProviderFunc {
return func(pc nodeutil.ProviderConfig) (nodeutil.Provider, node.NodeProvider, error) {
utilProvider, err := provider.New(*k.HostConfig, namespace, name)
utilProvider, err := provider.New(*k.hostConfig, namespace, name)
if err != nil {
return nil, nil, fmt.Errorf("unable to make nodeutil provider %w", err)
}
nodeProvider := provider.Node{}

provider.ConfigureNode(pc.Node, podIP, k.Port)
provider.ConfigureNode(pc.Node, podIP, k.port)
return utilProvider, &nodeProvider, nil
}
}

func (k *Kubelet) nodeOpts(srvPort, namespace, name, podIP string) nodeutil.NodeOpt {
func (k *kubelet) nodeOpts(srvPort, namespace, name, podIP string) nodeutil.NodeOpt {
return func(c *nodeutil.NodeConfig) error {
c.HTTPListenAddr = fmt.Sprintf(":%s", srvPort)
// set up the routes
Expand All @@ -158,7 +155,7 @@ func (k *Kubelet) nodeOpts(srvPort, namespace, name, podIP string) nodeutil.Node

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
tlsConfig, err := loadTLSConfig(ctx, k.HostClient, name, namespace, k.Name, podIP)
tlsConfig, err := loadTLSConfig(ctx, k.hostClient, name, namespace, k.name, podIP)
if err != nil {
return fmt.Errorf("unable to get tls config: %w", err)
}
Expand Down
22 changes: 10 additions & 12 deletions k3k-kubelet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@ import (
"fmt"
"os"

"github.com/rancher/k3k/k3k-kubelet/config"
"github.com/rancher/k3k/k3k-kubelet/kubelet"
"github.com/sirupsen/logrus"
"github.com/urfave/cli"
)

var (
configFile string
c config.Config
c config
)

func main() {
Expand All @@ -24,44 +22,44 @@ func main() {
cli.StringFlag{
Name: "cluster-name",
Usage: "Name of the k3k cluster",
Destination: &c.ClusterName,
Destination: &c.clusterName,
EnvVar: "CLUSTER_NAME",
},
cli.StringFlag{
Name: "cluster-namespace",
Usage: "Namespace of the k3k cluster",
Destination: &c.ClusterNamespace,
Destination: &c.clusterNamespace,
EnvVar: "CLUSTER_NAMESPACE",
},
cli.StringFlag{
Name: "cluster-token",
Usage: "K3S token of the k3k cluster",
Destination: &c.Token,
Destination: &c.token,
EnvVar: "CLUSTER_Token",
},
cli.StringFlag{
Name: "host-config-path",
Usage: "Path to the host kubeconfig, if empty then virtual-kubelet will use incluster config",
Destination: &c.HostConfigPath,
Destination: &c.hostConfigPath,
EnvVar: "HOST_KUBECONFIG",
},
cli.StringFlag{
Name: "virtual-config-path",
Usage: "Path to the k3k cluster kubeconfig, if empty then virtual-kubelet will create its own config from k3k cluster",
Destination: &c.VirtualConfigPath,
Destination: &c.virtualConfigPath,
EnvVar: "CLUSTER_NAME",
},
cli.StringFlag{
Name: "kubelet-port",
Usage: "kubelet API port number",
Destination: &c.KubeletPort,
Destination: &c.kubeletPort,
EnvVar: "SERVER_PORT",
Value: "9443",
},
cli.StringFlag{
Name: "agent-pod-ip",
Usage: "Agent Pod IP used for TLS SAN for the kubelet server",
Destination: &c.AgentPodIP,
Destination: &c.agentPodIP,
EnvVar: "AGENT_POD_IP",
},
cli.StringFlag{
Expand All @@ -88,13 +86,13 @@ func Run(clx *cli.Context) {
fmt.Printf("failed to validate config: %v", err)
os.Exit(-1)
}
k, err := kubelet.New(&c)
k, err := newKubelet(&c)
if err != nil {
fmt.Printf("failed to create new virtual kubelet instance: %v", err)
os.Exit(-1)
}

if err := k.RegisterNode(c.KubeletPort, c.ClusterNamespace, c.ClusterName, c.AgentPodIP); err != nil {
if err := k.RegisterNode(c.kubeletPort, c.clusterNamespace, c.clusterName, c.agentPodIP); err != nil {
fmt.Printf("failed to register new node: %v", err)
os.Exit(-1)
}
Expand Down

0 comments on commit 4035566

Please sign in to comment.