Skip to content

Commit

Permalink
Merge pull request #205 from castai/anywhere-provider
Browse files Browse the repository at this point in the history
feat: anywhere provider
  • Loading branch information
laimonasr authored Dec 2, 2024
2 parents 0ef0a69 + 910229a commit 01d33bb
Show file tree
Hide file tree
Showing 17 changed files with 682 additions and 34 deletions.
2 changes: 1 addition & 1 deletion cmd/dump/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func run(ctx context.Context) error {
if cfg.Static != nil && cfg.Static.ClusterID != "" {
clusterID = cfg.Static.ClusterID
} else {
c, err := discoveryService.GetClusterID(ctx)
c, err := discoveryService.GetKubeSystemNamespaceID(ctx)
if err != nil {
return fmt.Errorf("getting cluster ID: %w", err)
}
Expand Down
6 changes: 6 additions & 0 deletions internal/castai/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ type OpenshiftParams struct {
InternalID string `json:"internalId"`
}

type AnywhereParams struct {
ClusterName string `json:"clusterName"`
KubeSystemNamespaceID uuid.UUID `json:"kubeSystemNamespaceId"`
}

type RegisterClusterRequest struct {
ID uuid.UUID `json:"id"`
Name string `json:"name"`
Expand All @@ -49,6 +54,7 @@ type RegisterClusterRequest struct {
KOPS *KOPSParams `json:"kops"`
AKS *AKSParams `json:"aks"`
Openshift *OpenshiftParams `json:"openshift"`
Anywhere *AnywhereParams `json:"anywhere"`
}

type Cluster struct {
Expand Down
5 changes: 5 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Config struct {
KOPS *KOPS `mapstructure:"kops"`
AKS *AKS `mapstructure:"aks"`
OpenShift *OpenShift `mapstructure:"openshift"`
Anywhere *Anywhere `mapstructure:"anywhere"`

Static *Static `mapstructure:"static"`
Controller *Controller `mapstructure:"controller"`
Expand Down Expand Up @@ -109,6 +110,10 @@ type Static struct {
ClusterID string `mapstructure:"cluster_id"`
}

type Anywhere struct {
ClusterName string `mapstructure:"cluster_name"`
}

type Controller struct {
Interval time.Duration `mapstructure:"interval"`
MemoryPressureInterval time.Duration `mapstructure:"memory_pressure_interval"`
Expand Down
125 changes: 125 additions & 0 deletions internal/services/controller/mock/workqueue.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 52 additions & 12 deletions internal/services/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@ import (
)

type Service interface {
// GetCSP discovers the cluster cloud service provider (CSP) by listing the cluster nodes and inspecting their labels.
// CSP is retrieved by parsing the Node.Spec.ProviderID property.
GetCSP(ctx context.Context) (csp cloud.Cloud, reterr error)

// GetCSPAndRegion discovers the cluster cloud service provider (CSP) and the region the cluster is deployed in by
// listing the cluster nodes and inspecting their labels. CSP is retrieved by parsing the Node.Spec.ProviderID property.
// Whereas the region is read from the well-known node region labels.
GetCSPAndRegion(ctx context.Context) (csp cloud.Cloud, region string, reterr error)

// GetClusterID retrieves the cluster ID by reading the UID of the kube-system namespace.
GetClusterID(ctx context.Context) (*uuid.UUID, error)
// GetKubeSystemNamespaceID retrieves the UID of the kube-system namespace.
GetKubeSystemNamespaceID(ctx context.Context) (*uuid.UUID, error)

// GetKOPSClusterNameAndStateStore discovers the cluster name and kOps state store bucket from the kube-system namespace
// annotation. kOps annotates the kube-system namespace with annotations such as this:
Expand Down Expand Up @@ -57,22 +61,26 @@ func New(clientset kubernetes.Interface, dyno dynamic.Interface) *ServiceImpl {
}
}

func (s *ServiceImpl) GetCSP(ctx context.Context) (cloud.Cloud, error) {
return s.getCSP(ctx, "")
}

func (s *ServiceImpl) GetCSPAndRegion(ctx context.Context) (csp cloud.Cloud, region string, reterr error) {
return s.getCSPAndRegion(ctx, "")
}

func (s *ServiceImpl) GetClusterID(ctx context.Context) (*uuid.UUID, error) {
func (s *ServiceImpl) GetKubeSystemNamespaceID(ctx context.Context) (*uuid.UUID, error) {
ns, err := s.getKubeSystemNamespace(ctx)
if err != nil {
return nil, err
}

clusterID, err := uuid.Parse(string(ns.UID))
namespaceID, err := uuid.Parse(string(ns.UID))
if err != nil {
return nil, fmt.Errorf("parsing namespace %q uid: %w", metav1.NamespaceSystem, err)
}

return &clusterID, nil
return &namespaceID, nil
}

func (s *ServiceImpl) getKubeSystemNamespace(ctx context.Context) (*v1.Namespace, error) {
Expand All @@ -93,10 +101,35 @@ func (s *ServiceImpl) getKubeSystemNamespace(ctx context.Context) (*v1.Namespace
return ns, nil
}

func (s *ServiceImpl) getCSP(ctx context.Context, next string) (cloud.Cloud, error) {
nodes, err := s.listNodes(ctx, next)
if err != nil {
return "", err
}

for i := range nodes.Items {
node := &nodes.Items[i]

if !isNodeReady(node) {
continue
}

if nodeCSP, ok := getNodeCSP(node); ok {
return nodeCSP, nil
}
}

if nodes.Continue != "" {
return s.getCSP(ctx, nodes.Continue)
}

return "", fmt.Errorf("failed to discover csp")
}

func (s *ServiceImpl) getCSPAndRegion(ctx context.Context, next string) (csp cloud.Cloud, region string, reterr error) {
nodes, err := s.clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{Limit: 10, Continue: next})
nodes, err := s.listNodes(ctx, next)
if err != nil {
return "", "", fmt.Errorf("listing nodes: %w", err)
return "", "", err
}

for i := range nodes.Items {
Expand All @@ -106,13 +139,11 @@ func (s *ServiceImpl) getCSPAndRegion(ctx context.Context, next string) (csp clo
continue
}

nodeCSP, ok := getCSP(node)
if ok {
if nodeCSP, ok := getNodeCSP(node); ok {
csp = nodeCSP
}

nodeRegion, ok := getRegion(node)
if ok {
if nodeRegion, ok := getRegion(node); ok {
region = nodeRegion
}

Expand All @@ -128,6 +159,15 @@ func (s *ServiceImpl) getCSPAndRegion(ctx context.Context, next string) (csp clo
return "", "", fmt.Errorf("failed discovering properties: csp=%q, region=%q", csp, region)
}

func (s *ServiceImpl) listNodes(ctx context.Context, next string) (*v1.NodeList, error) {
nodes, err := s.clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{Limit: 10, Continue: next})
if err != nil {
return nil, fmt.Errorf("listing nodes: %w", err)
}

return nodes, nil
}

func isNodeReady(n *v1.Node) bool {
for _, cond := range n.Status.Conditions {
if cond.Type == v1.NodeReady && cond.Status == v1.ConditionTrue {
Expand All @@ -150,7 +190,7 @@ func getRegion(n *v1.Node) (string, bool) {
return "", false
}

func getCSP(n *v1.Node) (cloud.Cloud, bool) {
func getNodeCSP(n *v1.Node) (cloud.Cloud, bool) {
providerID := n.Spec.ProviderID

if strings.HasPrefix(providerID, "gce://") {
Expand Down
4 changes: 2 additions & 2 deletions internal/services/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"castai-agent/pkg/cloud"
)

func TestServiceImpl_GetClusterID(t *testing.T) {
func TestServiceImpl_GetKubeSystemNamespaceID(t *testing.T) {
namespaceID := uuid.New()
objects := []runtime.Object{
&v1.Namespace{
Expand All @@ -33,7 +33,7 @@ func TestServiceImpl_GetClusterID(t *testing.T) {

s := New(clientset, dyno)

id, err := s.GetClusterID(context.Background())
id, err := s.GetKubeSystemNamespaceID(context.Background())

require.NoError(t, err)
require.Equal(t, namespaceID, *id)
Expand Down
45 changes: 30 additions & 15 deletions internal/services/discovery/mock/discovery.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 01d33bb

Please sign in to comment.