Skip to content

Commit

Permalink
feat(api): add GetAnalysisRun endpoint (#1682)
Browse files Browse the repository at this point in the history
Signed-off-by: Hidde Beydals <[email protected]>
  • Loading branch information
hiddeco authored Mar 26, 2024
1 parent 0b43b26 commit 11a956e
Show file tree
Hide file tree
Showing 24 changed files with 24,593 additions and 722 deletions.
13 changes: 13 additions & 0 deletions api/service/v1alpha1/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package akuity.io.kargo.service.v1alpha1;
import "google/protobuf/timestamp.proto";
import "v1alpha1/generated.proto";
import "k8s.io/api/core/v1/generated.proto";
import "rollouts/api/v1alpha1/generated.proto";

option go_package = "github.com/akuity/kargo/pkg/api/service/v1alpha1;svcv1alpha1";

Expand Down Expand Up @@ -72,6 +73,9 @@ service KargoService {
rpc GetCredentials(GetCredentialsRequest) returns (GetCredentialsResponse);
rpc ListCredentials(ListCredentialsRequest) returns (ListCredentialsResponse);
rpc UpdateCredentials(UpdateCredentialsRequest) returns (UpdateCredentialsResponse);

/* AnalysisRun APIs */
rpc GetAnalysisRun(GetAnalysisRunRequest) returns (GetAnalysisRunResponse);
}

message ComponentVersions {
Expand Down Expand Up @@ -493,3 +497,12 @@ message UpdateCredentialsRequest {
message UpdateCredentialsResponse {
k8s.io.api.core.v1.Secret credentials = 1;
}

message GetAnalysisRunRequest {
string namespace = 1;
string name = 2;
}

message GetAnalysisRunResponse {
github.com.akuity.kargo.internal.controller.rollouts.api.v1alpha1.AnalysisRun analysis_run = 1 [json_name = "analysisRun"];
}
1 change: 1 addition & 0 deletions buf.work.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
version: v1
directories:
- api
- internal/controller
- vendor
18 changes: 18 additions & 0 deletions charts/kargo/templates/api/cluster-role-binding.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,22 @@ subjects:
- kind: ServiceAccount
namespace: {{ .Release.Namespace }}
name: kargo-api
{{- if .Values.api.rollouts.integrationEnabled }}
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: kargo-api-rollouts
labels:
{{- include "kargo.labels" . | nindent 4 }}
{{- include "kargo.api.labels" . | nindent 4 }}
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: kargo-api-rollouts
subjects:
- kind: ServiceAccount
namespace: {{ .Release.Namespace }}
name: kargo-api
{{- end }}
{{- end }}
19 changes: 19 additions & 0 deletions charts/kargo/templates/api/cluster-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,23 @@ rules:
- secrets
verbs:
- "*"
{{- if .Values.api.rollouts.integrationEnabled }}
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: kargo-api-rollouts
labels:
{{- include "kargo.labels" . | nindent 4 }}
{{- include "kargo.api.labels" . | nindent 4 }}
rules:
- apiGroups:
- argoproj.io
resources:
- analysisruns
verbs:
- get
- list
- watch
{{- end }}
{{- end }}
249 changes: 148 additions & 101 deletions cmd/controlplane/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,102 +26,123 @@ import (
versionpkg "github.com/akuity/kargo/internal/version"
)

type apiOptions struct {
KubeConfig string

RolloutsEnabled bool
RolloutsKubeConfig string
AnalysisRunsNamespace string

Host string
Port string

Logger *log.Logger
}

func newAPICommand() *cobra.Command {
cmdOpts := &apiOptions{
Logger: log.StandardLogger(),
}

return &cobra.Command{
Use: "api",
DisableAutoGenTag: true,
SilenceErrors: true,
SilenceUsage: true,
RunE: func(cmd *cobra.Command, _ []string) error {
ctx := cmd.Context()

version := versionpkg.GetVersion()
log.WithFields(log.Fields{
"version": version.Version,
"commit": version.GitCommit,
}).Info("Starting Kargo API Server")

cfg := config.ServerConfigFromEnv()
restCfg, err := kubernetes.GetRestConfig(ctx, os.GetEnv("KUBECONFIG", ""))
if err != nil {
return fmt.Errorf("error loading REST config: %w", err)
}

scheme := runtime.NewScheme()
if err = kubescheme.AddToScheme(scheme); err != nil {
return fmt.Errorf("add Kubernetes api to scheme: %w", err)
}
if types.MustParseBool(os.GetEnv("ROLLOUTS_INTEGRATION_ENABLED", "true")) {
if argoRolloutsExists(ctx, restCfg) {
log.Info("Argo Rollouts integration is enabled")
if err = rollouts.AddToScheme(scheme); err != nil {
return fmt.Errorf("add argo rollouts api to scheme: %w", err)
}
} else {
log.Warn(
"Argo Rollouts integration was enabled, but no Argo Rollouts " +
"CRDs were found. Proceeding without Argo Rollouts integration.",
)
}
} else {
log.Info("Argo Rollouts integration is disabled")
}
if err = kargoapi.AddToScheme(scheme); err != nil {
return fmt.Errorf("add kargo api to scheme: %w", err)
}

internalClient, err := newClientForAPI(ctx, restCfg, scheme)
if err != nil {
return fmt.Errorf("create internal Kubernetes client: %w", err)
}
kubeClientOptions := kubernetes.ClientOptions{
NewInternalClient: func(context.Context, *rest.Config, *runtime.Scheme) (client.Client, error) {
return internalClient, nil
},
}
if cfg.OIDCConfig != nil {
kubeClientOptions.GlobalServiceAccountNamespaces = cfg.OIDCConfig.GlobalServiceAccountNamespaces
}
kubeClient, err := kubernetes.NewClient(ctx, restCfg, kubeClientOptions)
if err != nil {
return fmt.Errorf("create Kubernetes client: %w", err)
}

if cfg.AdminConfig != nil {
log.Info("admin account is enabled")
}
if cfg.OIDCConfig != nil {
log.WithFields(log.Fields{
"issuerURL": cfg.OIDCConfig.IssuerURL,
"clientID": cfg.OIDCConfig.ClientID,
"cliClientID": cfg.OIDCConfig.CLIClientID,
}).Info("SSO via OpenID Connect is enabled")
}

srv := api.NewServer(cfg, kubeClient, internalClient)
l, err := net.Listen(
"tcp",
fmt.Sprintf(
"%s:%s",
os.GetEnv("HOST", "0.0.0.0"),
os.GetEnv("PORT", "8080"),
),
)
if err != nil {
return fmt.Errorf("error creating listener: %w", err)
}
defer l.Close()

if err = srv.Serve(ctx, l); err != nil {
return fmt.Errorf("serve: %w", err)
}
return nil
cmdOpts.complete()

return cmdOpts.run(cmd.Context())
},
}
}

func newClientForAPI(ctx context.Context, r *rest.Config, scheme *runtime.Scheme) (client.Client, error) {
mgr, err := ctrl.NewManager(r, ctrl.Options{
func (o *apiOptions) complete() {
o.KubeConfig = os.GetEnv("KUBECONFIG", "")

o.RolloutsEnabled = types.MustParseBool(os.GetEnv("ROLLOUTS_INTEGRATION_ENABLED", "true"))
o.RolloutsKubeConfig = os.GetEnv("ROLLOUTS_KUBECONFIG", "")
o.AnalysisRunsNamespace = os.GetEnv("ROLLOUTS_ANALYSIS_RUNS_NAMESPACE", "")

o.Host = os.GetEnv("HOST", "0.0.0.0")
o.Port = os.GetEnv("PORT", "8080")
}

func (o *apiOptions) run(ctx context.Context) error {
version := versionpkg.GetVersion()
o.Logger.WithFields(log.Fields{
"version": version.Version,
"commit": version.GitCommit,
}).Info("Starting Kargo API Server")

cfg := config.ServerConfigFromEnv()

clientCfg, internalClient, err := o.setupAPIClient(ctx)
if err != nil {
return fmt.Errorf("error setting up internal Kubernetes API client: %w", err)
}
kubeClient, err := newWrappedKubernetesClient(ctx, clientCfg, internalClient, cfg)
if err != nil {
return fmt.Errorf("error creating Kubernetes client for Kargo API server: %w", err)
}

var rolloutsAvailable bool
switch {
case !o.RolloutsEnabled:
o.Logger.Info("Argo Rollouts integration is disabled")
case !argoRolloutsExists(ctx, clientCfg):
o.Logger.Warn(
"Argo Rollouts integration was enabled, but no Argo Rollouts " +
"CRDs were found. Proceeding without Argo Rollouts integration.",
)
default:
o.Logger.Info("Argo Rollouts integration is enabled")
}

if cfg.AdminConfig != nil {
log.Info("admin account is enabled")
}
if cfg.OIDCConfig != nil {
log.WithFields(log.Fields{
"issuerURL": cfg.OIDCConfig.IssuerURL,
"clientID": cfg.OIDCConfig.ClientID,
"cliClientID": cfg.OIDCConfig.CLIClientID,
}).Info("SSO via OpenID Connect is enabled")
}

srv := api.NewServer(cfg, kubeClient, internalClient, rolloutsAvailable)
l, err := net.Listen("tcp", fmt.Sprintf("%s:%s", o.Host, o.Port))
if err != nil {
return fmt.Errorf("error creating listener: %w", err)
}
defer l.Close()

if err = srv.Serve(ctx, l); err != nil {
return fmt.Errorf("error serving API: %w", err)
}
return nil
}

func (o *apiOptions) setupAPIClient(ctx context.Context) (*rest.Config, client.Client, error) {
restCfg, err := kubernetes.GetRestConfig(ctx, o.KubeConfig)
if err != nil {
return nil, nil, fmt.Errorf("get REST config: %w", err)
}

scheme := runtime.NewScheme()
if err = kubescheme.AddToScheme(scheme); err != nil {
return nil, nil, fmt.Errorf("error adding Kubernetes API to Kargo API manager scheme: %w", err)
}

if err = rollouts.AddToScheme(scheme); err != nil {
return nil, nil, fmt.Errorf("error adding Argo Rollouts API to Kargo API manager scheme: %w", err)
}

if err = kargoapi.AddToScheme(scheme); err != nil {
return nil, nil, fmt.Errorf("error adding Kargo API to Kargo API manager scheme: %w", err)
}

mgr, err := ctrl.NewManager(restCfg, ctrl.Options{
Scheme: scheme,
Metrics: server.Options{
BindAddress: "0",
Expand All @@ -135,48 +156,74 @@ func newClientForAPI(ctx context.Context, r *rest.Config, scheme *runtime.Scheme
},
})
if err != nil {
return nil, fmt.Errorf("new manager: %w", err)
return nil, nil, fmt.Errorf("error initializing Kargo API manager: %w", err)
}

if err = registerKargoIndexers(ctx, mgr); err != nil {
return nil, nil, fmt.Errorf("failed to register Kargo indexers: %w", err)
}

go func() {
if err := mgr.Start(ctx); err != nil {
panic(fmt.Errorf("error starting Kargo API manager: %w", err))
}
}()

return restCfg, mgr.GetClient(), nil
}

func registerKargoIndexers(ctx context.Context, mgr ctrl.Manager) error {
// Index Promotions by Stage
if err := kubeclient.IndexPromotionsByStage(ctx, mgr); err != nil {
return nil, fmt.Errorf("index Promotions by Stage: %w", err)
return fmt.Errorf("index Promotions by Stage: %w", err)
}

// Index Freight by Warehouse
if err := kubeclient.IndexFreightByWarehouse(ctx, mgr); err != nil {
return nil, fmt.Errorf("index Freight by Warehouse: %w", err)
return fmt.Errorf("index Freight by Warehouse: %w", err)
}

// Index Freight by Stages in which it has been verified
if err := kubeclient.IndexFreightByVerifiedStages(ctx, mgr); err != nil {
return nil, fmt.Errorf("index Freight by Stages in which it has been verified: %w", err)
return fmt.Errorf("index Freight by Stages in which it has been verified: %w", err)
}

// Index Freight by Stages for which it is approved
if err :=
kubeclient.IndexFreightByApprovedStages(ctx, mgr); err != nil {
return nil, fmt.Errorf("index Freight by Stages for which it has been approved: %w", err)
if err := kubeclient.IndexFreightByApprovedStages(ctx, mgr); err != nil {
return fmt.Errorf("index Freight by Stages for which it has been approved: %w", err)
}

// Index ServiceAccounts by ODIC email
if err := kubeclient.IndexServiceAccountsByOIDCEmail(ctx, mgr); err != nil {
return nil, fmt.Errorf("index ServiceAccounts by OIDC email: %w", err)
return fmt.Errorf("index ServiceAccounts by OIDC email: %w", err)
}

// Index ServiceAccounts by OIDC groups
if err := kubeclient.IndexServiceAccountsByOIDCGroups(ctx, mgr); err != nil {
return nil, fmt.Errorf("index ServiceAccounts by OIDC groups: %w", err)
return fmt.Errorf("index ServiceAccounts by OIDC groups: %w", err)
}

// Index ServiceAccounts by OIDC subjects
if err := kubeclient.IndexServiceAccountsByOIDCSubjects(ctx, mgr); err != nil {
return nil, fmt.Errorf("index ServiceAccounts by OIDC subjects: %w", err)
return fmt.Errorf("index ServiceAccounts by OIDC subjects: %w", err)
}

go func() {
if err := mgr.Start(ctx); err != nil {
panic(fmt.Errorf("start manager: %w", err))
}
}()
return nil
}

return mgr.GetClient(), nil
func newWrappedKubernetesClient(
ctx context.Context,
restCfg *rest.Config,
internalClient client.Client,
serverCfg config.ServerConfig,
) (kubernetes.Client, error) {
kubeClientOptions := kubernetes.ClientOptions{
NewInternalClient: func(context.Context, *rest.Config, *runtime.Scheme) (client.Client, error) {
return internalClient, nil
},
}
if serverCfg.OIDCConfig != nil {
kubeClientOptions.GlobalServiceAccountNamespaces = serverCfg.OIDCConfig.GlobalServiceAccountNamespaces
}
return kubernetes.NewClient(ctx, restCfg, kubeClientOptions)
}
Loading

0 comments on commit 11a956e

Please sign in to comment.