Skip to content

Commit

Permalink
chore: new struct for context
Browse files Browse the repository at this point in the history
  • Loading branch information
yashmehrotra committed Feb 20, 2025
1 parent ade030e commit 9cb631d
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 81 deletions.
13 changes: 2 additions & 11 deletions connection/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,7 @@ func SetupConnection(ctx context.Context, connections ExecConnections, cmd *osEx
return nil, err
}

if k8sclient, err := connections.Kubernetes.Populate(ctx.WithNamespace(scraperNamespace), true); err != nil {
return nil, fmt.Errorf("failed to hydrate kubernetes connection: %w", err)
} else {
ctx = ctx.WithKubernetes(k8sclient)
}

ctx = ctx.WithNamespace(scraperNamespace).WithKubernetes(connections.Kubernetes)
break
}
}
Expand All @@ -167,11 +162,7 @@ func SetupConnection(ctx context.Context, connections ExecConnections, cmd *osEx

if connections.Kubernetes != nil {
if lo.FromPtr(connections.FromConfigItem) == "" {
if k8sclient, err := connections.Kubernetes.Populate(ctx, true); err != nil {
return nil, fmt.Errorf("failed to hydrate kubernetes connection: %w", err)
} else {
ctx = ctx.WithKubernetes(k8sclient)
}
ctx = ctx.WithKubernetes(connections.Kubernetes)
}

if filepath.IsAbs(connections.Kubernetes.Kubeconfig.ValueStatic) {
Expand Down
43 changes: 23 additions & 20 deletions connection/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ package connection

import (
"fmt"
"time"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

"github.com/flanksource/duty/cache"
"github.com/flanksource/duty/context"
dutyKubernetes "github.com/flanksource/duty/kubernetes"
"github.com/flanksource/duty/models"
Expand Down Expand Up @@ -54,7 +52,26 @@ type KubernetesConnection struct {
GKE *GKEConnection `json:"gke,omitempty"`
CNRM *CNRMConnection `json:"cnrm,omitempty"`

Client *dutyKubernetes.Client
// For tests and special cases
CustomClientSet kubernetes.Interface
CustomRestConfig *rest.Config
}

func (c KubernetesConnection) Hash() string {
if c.ConnectionName != "" {
return "connection=" + c.ConnectionName
}
if c.Kubeconfig != nil {
return "kubeconfig=" + c.Kubeconfig.String()
}
if c.EKS != nil {
return fmt.Sprintf("eks=%s/%v", c.EKS.Cluster, c.EKS.ToModel())
}
return "local"
}

func (c KubernetesConnection) CanExpire() bool {
return c.EKS != nil || c.GKE != nil || c.EKS != nil
}

func (t KubernetesConnection) ToModel() models.Connection {
Expand All @@ -64,25 +81,11 @@ func (t KubernetesConnection) ToModel() models.Connection {
}
}

var k8sClientCache = cache.NewCache[*dutyKubernetes.Client]("k8s-client-cache", 24*time.Hour)

func (t *KubernetesConnection) Populate(ctx context.Context, freshToken bool) (*dutyKubernetes.Client, error) {
clientSet, restConfig, err := t.populate(ctx, freshToken)
if err != nil {
return nil, fmt.Errorf("error populating kubernetes connection: %w", err)
}

cacheKey := dutyKubernetes.RestConfigFingerprint(restConfig)
if c, err := k8sClientCache.Get(ctx, cacheKey); err == nil {
return c, nil
func (t *KubernetesConnection) Populate(ctx context.Context, freshToken bool) (kubernetes.Interface, *rest.Config, error) {
if t.CustomClientSet != nil {
return t.CustomClientSet, t.CustomRestConfig, nil
}

c := dutyKubernetes.NewKubeClient(clientSet, restConfig)
_ = k8sClientCache.Set(ctx, cacheKey, c)
return c, nil
}

func (t *KubernetesConnection) populate(ctx context.Context, freshToken bool) (kubernetes.Interface, *rest.Config, error) {
if clientset, restConfig, err := t.KubeconfigConnection.Populate(ctx); err != nil {
return nil, nil, fmt.Errorf("failed to populate kube config connection: %w", err)
} else if clientset != nil {
Expand Down
106 changes: 79 additions & 27 deletions context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

commons "github.com/flanksource/commons/context"
"github.com/flanksource/commons/logger"
"github.com/flanksource/duty/cache"
dutyGorm "github.com/flanksource/duty/gorm"
dutyKubernetes "github.com/flanksource/duty/kubernetes"
"github.com/flanksource/duty/models"
Expand All @@ -24,6 +25,8 @@ import (
"go.opentelemetry.io/otel/trace/noop"
"gorm.io/gorm"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

type ContextKey string
Expand Down Expand Up @@ -247,8 +250,17 @@ func (k Context) WithDebug() Context {
}
}

func (k Context) WithKubernetes(client *dutyKubernetes.Client) Context {
return k.WithValue("kubernetes-client", client)
type KubernetesConnection interface {
Populate(Context, bool) (kubernetes.Interface, *rest.Config, error)
Hash() string
CanExpire() bool
}

func (k Context) WithKubernetes(conn KubernetesConnection) Context {
if conn == nil {
return k
}
return k.WithValue("kubernetes-connection", conn)
}

func (k Context) WithNamespace(namespace string) Context {
Expand Down Expand Up @@ -344,50 +356,90 @@ func (k Context) Pool() *pgxpool.Pool {
// KubeAuthFingerprint generates a unique SHA-256 hash to identify the Kubernetes API server
// and client authentication details from the REST configuration.
func (k *Context) KubeAuthFingerprint() string {
rc := k.Kubernetes().RestConfig()
kc, _ := k.Kubernetes()
if kc == nil {
return ""
}
rc := kc.RestConfig()
if rc == nil {
return ""
}
return dutyKubernetes.RestConfigFingerprint(rc)
}

func (k *Context) Kubernetes() *dutyKubernetes.Client {
if v, ok := k.Value("kubernetes-client").(*dutyKubernetes.Client); ok {
func (k *Context) KubernetesConnection() KubernetesConnection {
if v, ok := k.Value("kubernetes-connection").(KubernetesConnection); ok {
return v
}
return nil
}

type KubernetesClient struct {
*dutyKubernetes.Client
Connection KubernetesConnection
expiry time.Time
}

func (k *Context) KubernetesClient() *dutyKubernetes.Client {
if v, ok := k.Value("kubernetes-client").(*dutyKubernetes.Client); ok {
return v
}
return nil
func (c *KubernetesClient) SetExpiry(d time.Duration) {
c.expiry = time.Now().Add(d)
}

// Deprecated: Use KubernetesClient
func (k *Context) KubernetesDynamicClient() *dutyKubernetes.Client {
return k.KubernetesClient()
func (c *KubernetesClient) RefreshWithExpiry(ctx Context, d time.Duration) error {
if !c.HasExpired() {
return nil
}
_, rc, err := c.Connection.Populate(ctx, true)
if err != nil {
return fmt.Errorf("%w", err)
}

// Update rest config in place for easy reuse
c.Config.Host = rc.Host
c.Config.TLSClientConfig = rc.TLSClientConfig
c.Config.BearerToken = rc.BearerToken

c.SetExpiry(15 * time.Minute)
return nil
}

func (k *Context) WithKubeconfig(input types.EnvVar) (*Context, error) {
if k.GetNamespace() == "" {
return nil, k.Oops().Errorf("namespace is required")
func (c KubernetesClient) HasExpired() bool {
if c.Connection.CanExpire() {
return time.Until(c.expiry) <= 0
}
return false
}

//val, err := k.GetEnvValueFromCache(input, k.GetNamespace())
//if err != nil {
//return k, k.Oops().Wrap(err)
//}
var k8sclientcache = cache.NewCache[*KubernetesClient]("k8s-client-cache", 24*time.Hour)

//clientset, restConfig, err := dutyKubernetes.NewClientFromPathOrConfig(k.Logger, val)
//if err != nil {
//return k, k.Oops().Wrap(err)
//}
func (k Context) Kubernetes() (*dutyKubernetes.Client, error) {
conn, ok := k.Value("kubernetes-connection").(KubernetesConnection)
if !ok {
return nil, fmt.Errorf("invalid type for KubernetesConnection")
}
connHash := conn.Hash()
if client, exists := k8sclientcache.Get(k, connHash); exists == nil {
client.RefreshWithExpiry(k, 15*time.Minute)
logger.Infof("From client cache")
return client.Client, nil
}
c, rc, err := conn.Populate(k, true)
if err != nil {
return nil, err
}
client := &KubernetesClient{
Client: dutyKubernetes.NewKubeClient(c, rc),
Connection: conn,
}
client.SetExpiry(15 * time.Minute)
k8sclientcache.Set(k, connHash, client)
return client.Client, nil
}

//c := k.WithKubernetes(clientset, restConfig)
return k, nil
func (k *Context) KubernetesClient() *dutyKubernetes.Client {
if v, ok := k.Value("kubernetes-client").(*dutyKubernetes.Client); ok {
return v
}
return nil
}

func (k Context) WithRLSPayload(payload *rls.Payload) Context {
Expand Down Expand Up @@ -546,7 +598,7 @@ func (k Context) HydrateConnection(connection *models.Connection) (*models.Conne
func (k Context) Wrap(ctx gocontext.Context) Context {
return NewContext(ctx, commons.WithTracer(k.GetTracer()), commons.WithLogger(k.Logger)).
WithDB(k.DB(), k.Pool()).
WithKubernetes(k.Kubernetes()).
WithKubernetes(k.KubernetesConnection()).
WithNamespace(k.GetNamespace())
}

Expand Down
25 changes: 21 additions & 4 deletions context/envvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,12 @@ func GetHelmValueFromCache(ctx Context, namespace, releaseName, key string) (str
return "", fmt.Errorf("could not parse key:%s. must be a valid jsonpath expression. %w", key, err)
}

secretList, err := ctx.Kubernetes().CoreV1().Secrets(namespace).List(ctx, metav1.ListOptions{
client, err := ctx.Kubernetes()
if err != nil {
return "", fmt.Errorf("error creating kubernetes client: %w", err)
}

secretList, err := client.CoreV1().Secrets(namespace).List(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("type=%s", helmSecretType),
LabelSelector: fmt.Sprintf("status=deployed,name=%s", releaseName),
Limit: 1,
Expand Down Expand Up @@ -160,8 +165,12 @@ func GetSecretFromCache(ctx Context, namespace, name, key string) (string, error
if value, found := envCache.Get(id); found {
return value.(string), nil
}
client, err := ctx.Kubernetes()
if err != nil {
return "", fmt.Errorf("error creating kubernetes client: %w", err)
}

secret, err := ctx.Kubernetes().CoreV1().Secrets(namespace).Get(ctx, name, metav1.GetOptions{})
secret, err := client.CoreV1().Secrets(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return "", fmt.Errorf("could not find secret %s/%s: %s", namespace, name, err)
}
Expand All @@ -184,7 +193,11 @@ func GetConfigMapFromCache(ctx Context, namespace, name, key string) (string, er
if value, found := envCache.Get(id); found {
return value.(string), nil
}
configMap, err := ctx.Kubernetes().CoreV1().ConfigMaps(namespace).Get(ctx, name, metav1.GetOptions{})
client, err := ctx.Kubernetes()
if err != nil {
return "", fmt.Errorf("error creating kubernetes client: %w", err)
}
configMap, err := client.CoreV1().ConfigMaps(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return "", fmt.Errorf("could not get configmap %s/%s: %s", namespace, name, err)
}
Expand All @@ -206,7 +219,11 @@ func GetServiceAccountTokenFromCache(ctx Context, namespace, serviceAccount stri
if value, found := envCache.Get(id); found {
return value.(string), nil
}
tokenRequest, err := ctx.Kubernetes().CoreV1().ServiceAccounts(namespace).CreateToken(ctx, serviceAccount, &authenticationv1.TokenRequest{}, metav1.CreateOptions{})
client, err := ctx.Kubernetes()
if err != nil {
return "", fmt.Errorf("error creating kubernetes client: %w", err)
}
tokenRequest, err := client.CoreV1().ServiceAccounts(namespace).CreateToken(ctx, serviceAccount, &authenticationv1.TokenRequest{}, metav1.CreateOptions{})
if err != nil {
return "", fmt.Errorf("could not get token for service account %s/%s: %w", namespace, serviceAccount, err)
}
Expand Down
15 changes: 8 additions & 7 deletions kubernetes/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ type Client struct {
kubernetes.Interface
restMapper *restmapper.DeferredDiscoveryRESTMapper
dynamicClient *dynamic.DynamicClient
config *rest.Config
Config *rest.Config // Prefer updaating token in place
gvkClientCache cachev4.CacheInterface[dynamic.NamespaceableResourceInterface]
}

func NewKubeClient(client kubernetes.Interface, config *rest.Config) *Client {
return &Client{
Interface: client,
config: config,
Config: config,
gvkClientCache: cache.NewCache[dynamic.NamespaceableResourceInterface]("gvk-cache", 24*time.Hour),
}
}
Expand Down Expand Up @@ -95,6 +95,7 @@ func (c *Client) GetClientByGroupVersionKind(
) (dynamic.NamespaceableResourceInterface, error) {
cacheKey := group + version + kind
if dynamicClient, err := c.gvkClientCache.Get(ctx, cacheKey); err == nil {
logger.Infof("Cache hit for %s/%s/%s", group, version, kind)
return dynamicClient, nil
}

Expand Down Expand Up @@ -125,7 +126,7 @@ func (c *Client) GetClientByGroupVersionKind(
}

func (c *Client) RestConfig() *rest.Config {
return c.config
return c.Config
}

// WARN: "Kind" is not specific enough.
Expand Down Expand Up @@ -175,7 +176,7 @@ func (c *Client) GetDynamicClient() (dynamic.Interface, error) {
}

var err error
c.dynamicClient, err = dynamic.NewForConfig(c.config)
c.dynamicClient, err = dynamic.NewForConfig(c.Config)
return c.dynamicClient, err
}

Expand All @@ -185,13 +186,13 @@ func (c *Client) GetRestMapper() (meta.RESTMapper, error) {
}

// re-use kubectl cache
host := c.config.Host
host := c.Config.Host
host = strings.ReplaceAll(host, "https://", "")
host = strings.ReplaceAll(host, "-", "_")
host = strings.ReplaceAll(host, ":", "_")
cacheDir := os.ExpandEnv("$HOME/.kube/cache/discovery/" + host)
cache, err := disk.NewCachedDiscoveryClientForConfig(
c.config,
c.Config,
cacheDir,
"",
properties.Duration(10*time.Minute, "kubernetes.cache.timeout"),
Expand Down Expand Up @@ -224,7 +225,7 @@ func (c *Client) ExecutePodf(
req.Param("command", c)
}

exec, err := remotecommand.NewSPDYExecutor(c.config, "POST", req.URL())
exec, err := remotecommand.NewSPDYExecutor(c.Config, "POST", req.URL())
if err != nil {
return "", "", fmt.Errorf("ExecutePodf: Failed to get SPDY Executor: %v", err)
}
Expand Down
Loading

0 comments on commit 9cb631d

Please sign in to comment.