diff --git a/cmd/kndp/configuration/apply.go b/cmd/kndp/configuration/apply.go index 6ef93e1..14863be 100644 --- a/cmd/kndp/configuration/apply.go +++ b/cmd/kndp/configuration/apply.go @@ -5,7 +5,6 @@ import ( "github.com/kndpio/kndp/internal/configuration" - "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" "github.com/charmbracelet/log" @@ -15,6 +14,6 @@ type applyCmd struct { Link string `arg:"" required:"" help:"Link URL (or multiple comma separated) to Crossplane configuration to be applied to Environment."` } -func (c *applyCmd) Run(ctx context.Context, config *rest.Config, dynamicClient *dynamic.DynamicClient, logger *log.Logger) error { +func (c *applyCmd) Run(ctx context.Context, config *rest.Config, logger *log.Logger) error { return configuration.ApplyConfiguration(ctx, c.Link, config, logger) } diff --git a/cmd/kndp/configuration/configuration.go b/cmd/kndp/configuration/configuration.go index 2a50ae0..848e5e9 100644 --- a/cmd/kndp/configuration/configuration.go +++ b/cmd/kndp/configuration/configuration.go @@ -1,7 +1,8 @@ package configuration type Cmd struct { - Apply applyCmd `cmd:"" help:"Apply Crossplane Configuration."` - List listCmd `cmd:"" help:"Apply Crossplane Configuration."` + Apply applyCmd `cmd:"" help:"Apply Crossplane Configuration."` + List listCmd `cmd:"" help:"Apply Crossplane Configuration."` + Load loadCmd `cmd:"" help:"Load Crossplane Configuration from archive."` Delete deleteCmd `cmd:"" help:"Delete Crossplane Configuration."` } diff --git a/cmd/kndp/configuration/load.go b/cmd/kndp/configuration/load.go new file mode 100644 index 0000000..7798527 --- /dev/null +++ b/cmd/kndp/configuration/load.go @@ -0,0 +1,60 @@ +package configuration + +import ( + "bufio" + "context" + "os" + + "github.com/charmbracelet/log" + cfg "github.com/kndpio/kndp/internal/configuration" + "github.com/kndpio/kndp/internal/kube" + "github.com/kndpio/kndp/internal/registry" + "k8s.io/client-go/rest" +) + +type loadCmd struct { + Name string `arg:"" help:"Name of configuration."` + Path string `help:"Path to configuration package archive."` + Stdin bool `help:"Load configuration package from STDIN."` +} + +func (c *loadCmd) Run(ctx context.Context, config *rest.Config, logger *log.Logger) error { + + client, err := kube.Client(config) + if err != nil { + return err + } + + if !registry.IsLocalRegistry(ctx, client) { + logger.Warn("Local registry is not installed.") + return nil + } + + cfg := cfg.Configuration{} + cfg.Name = c.Name + logger.Debugf("Loading image to: %s", cfg.Name) + if c.Path != "" { + logger.Debugf("Loading from path: %s", c.Path) + err = cfg.LoadPathArchive(c.Path) + if err != nil { + return err + } + } else if c.Stdin { + logger.Debug("Loading from STDIN") + reader := bufio.NewReader(os.Stdin) + err = cfg.LoadStdinArchive(reader) + if err != nil { + return err + } + } else { + logger.Warn("Archive path or STDIN required for load configuration.") + return nil + } + logger.Debug("Pushing to local registry") + err = registry.PushLocalRegistry(ctx, cfg.Name, cfg.Image, config, logger) + if err != nil { + return err + } + logger.Infof("Image archive %s loaded to local registry.", cfg.Name) + return nil +} diff --git a/cmd/kndp/registry/create.go b/cmd/kndp/registry/create.go index 86b2bd1..8d5cc5d 100644 --- a/cmd/kndp/registry/create.go +++ b/cmd/kndp/registry/create.go @@ -21,13 +21,16 @@ type createCmd struct { Username string `required:"" help:"is your Username."` Password string `required:"" help:"is your Password."` Email string `required:"" help:"is your Email."` + Default bool `help:"Set registry as default."` + Local bool `help:"Create local registry."` } func (c *createCmd) Run(ctx context.Context, client *kubernetes.Clientset, config *rest.Config, logger *log.Logger) error { reg := registry.New(c.RegistryServer, c.Username, c.Password, c.Email) - - verr := reg.Validate() + reg.SetDefault(c.Default) + reg.SetLocal(c.Local) + verr := reg.Validate(logger) if verr != nil { errs := verr.(validator.ValidationErrors) for _, err := range errs { diff --git a/cmd/kndp/registry/delete.go b/cmd/kndp/registry/delete.go index 7723ba6..611bacf 100644 --- a/cmd/kndp/registry/delete.go +++ b/cmd/kndp/registry/delete.go @@ -9,12 +9,16 @@ import ( ) type deleteCmd struct { - Name string `required:"" help:"Registry name."` + Name string `required:"" help:"Registry name."` + Default bool `help:"Remove from default."` + Local bool `help:"Remove associated local registry."` } func (c deleteCmd) Run(ctx context.Context, config *rest.Config, logger *log.Logger) error { reg := registry.Registry{} reg.Name = c.Name + reg.SetDefault(c.Default) + reg.SetLocal(c.Local) err := reg.Delete(ctx, config, logger) if err != nil { logger.Error(err) diff --git a/internal/configuration/configuration.go b/internal/configuration/configuration.go index f7216df..10da308 100644 --- a/internal/configuration/configuration.go +++ b/internal/configuration/configuration.go @@ -8,6 +8,7 @@ import ( "github.com/charmbracelet/log" configuration "github.com/crossplane/crossplane/apis/pkg/v1" + regv1 "github.com/google/go-containerregistry/pkg/v1" "github.com/kndpio/kndp/internal/kube" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -22,6 +23,11 @@ const ( apiPlural = "configurations" ) +type Configuration struct { + Name string + Image regv1.Image +} + func CheckHealthStatus(status []condition.Condition) bool { healthStatus := false for _, condition := range status { diff --git a/internal/configuration/load.go b/internal/configuration/load.go new file mode 100644 index 0000000..1f3c641 --- /dev/null +++ b/internal/configuration/load.go @@ -0,0 +1,37 @@ +package configuration + +import ( + "bufio" + "io" + "os" + + "github.com/google/go-containerregistry/pkg/v1/tarball" +) + +// Load configuration package from TAR archive path +func (c *Configuration) LoadPathArchive(path string) error { + image, err := tarball.ImageFromPath(path, nil) + if err != nil { + return err + } + c.Image = image + return nil +} + +// Load configuration package from STDIN +func (c *Configuration) LoadStdinArchive(stream *bufio.Reader) error { + stdin, err := io.ReadAll(stream) + if err != nil { + return err + } + tmpFile, err := os.CreateTemp("", "kndp-configuration-*") + if err != nil { + return err + } + tmpFile.Write(stdin) + if err != nil { + return err + } + + return c.LoadPathArchive(tmpFile.Name()) +} diff --git a/internal/engine/engine.go b/internal/engine/engine.go index 10d18dc..e2b4cfb 100644 --- a/internal/engine/engine.go +++ b/internal/engine/engine.go @@ -64,6 +64,7 @@ var ( "xpkg.upbound.io/crossplane-contrib/provider-kubernetes:v0.13.0", }, }, + "args": []string{}, } ) diff --git a/internal/registry/local.go b/internal/registry/local.go new file mode 100644 index 0000000..816fb92 --- /dev/null +++ b/internal/registry/local.go @@ -0,0 +1,231 @@ +package registry + +import ( + "bytes" + "context" + "fmt" + "net" + "net/http" + "net/url" + "strings" + + "github.com/charmbracelet/log" + "github.com/google/go-containerregistry/pkg/name" + regv1 "github.com/google/go-containerregistry/pkg/v1" + "github.com/google/go-containerregistry/pkg/v1/remote" + "github.com/kndpio/kndp/internal/kube" + "github.com/kndpio/kndp/internal/namespace" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/portforward" + "k8s.io/client-go/transport/spdy" +) + +const ( + deployName = "kndp-registry" + svcName = "registry" + deployPort = 5000 + svcPort = 80 +) + +var ( + matchLabels = map[string]string{ + "app": deployName, + } +) + +// Create in cluster registry +func (r *Registry) CreateLocal(ctx context.Context, client *kubernetes.Clientset) error { + + deploy := &appsv1.Deployment{ + ObjectMeta: v1.ObjectMeta{ + Name: deployName, + }, + Spec: appsv1.DeploymentSpec{ + Selector: &v1.LabelSelector{ + MatchLabels: matchLabels, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: v1.ObjectMeta{ + Labels: matchLabels, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "registry", + Image: "registry:2", + Ports: []corev1.ContainerPort{ + { + Name: "oci", + Protocol: corev1.ProtocolTCP, + ContainerPort: deployPort, + }, + }, + }, + }, + }, + }, + }, + } + deployments := client.AppsV1().Deployments(namespace.Namespace) + + _, err := deployments.Get(ctx, deploy.GetName(), v1.GetOptions{}) + + if err == nil { + _, err := deployments.Update(ctx, deploy, v1.UpdateOptions{}) + if err != nil { + return err + } + } else { + _, err := deployments.Create(ctx, deploy, v1.CreateOptions{}) + if err != nil { + return err + } + } + + svc := &corev1.Service{ + ObjectMeta: v1.ObjectMeta{ + Name: svcName, + }, + Spec: corev1.ServiceSpec{ + Type: "ClusterIP", + Selector: deploy.Spec.Selector.MatchLabels, + Ports: []corev1.ServicePort{ + { + Name: "oci", + Protocol: corev1.ProtocolTCP, + Port: svcPort, + TargetPort: intstr.FromInt(deployPort), + }, + }, + }, + } + + svcs := client.CoreV1().Services(namespace.Namespace) + _, err = svcs.Get(ctx, svc.GetName(), v1.GetOptions{}) + if err == nil { + _, err := svcs.Update(ctx, svc, v1.UpdateOptions{}) + if err != nil { + return err + } + } else { + _, err := svcs.Create(ctx, svc, v1.CreateOptions{}) + if err != nil { + return err + } + } + + return nil +} + +// Delete in cluster registry +func (r *Registry) DeleteLocal(ctx context.Context, client *kubernetes.Clientset, logger *log.Logger) error { + svcs := client.CoreV1().Services(namespace.Namespace) + eSvc, _ := svcs.Get(ctx, svcName, v1.GetOptions{}) + if eSvc != nil { + err := svcs.Delete(ctx, svcName, v1.DeleteOptions{}) + if err != nil { + return err + } + } else { + logger.Warnf("Service %s not found", svcName) + } + deployments := client.AppsV1().Deployments(namespace.Namespace) + eDeploy, _ := deployments.Get(ctx, deployName, v1.GetOptions{}) + if eDeploy != nil { + err := deployments.Delete(ctx, deployName, v1.DeleteOptions{}) + if err != nil { + return err + } + } else { + logger.Warnf("Deployment %s not found", deployName) + } + return nil +} + +func IsLocalRegistry(ctx context.Context, client *kubernetes.Clientset) bool { + return true +} + +func PushLocalRegistry(ctx context.Context, imageName string, image regv1.Image, config *rest.Config, logger *log.Logger) error { + + client, err := kube.Client(config) + if err != nil { + return err + } + + pods := client.CoreV1().Pods(namespace.Namespace) + regs, err := pods.List(ctx, v1.ListOptions{Limit: 1, LabelSelector: "app=" + deployName}) + if err != nil { + return err + } + + roundTripper, upgrader, err := spdy.RoundTripperFor(config) + if err != nil { + return err + } + + lPort, err := getFreePort() + if err != nil { + return err + } + + logger.Debugf("Found local registry with name: %s", regs.Items[0].GetName()) + + path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", namespace.Namespace, regs.Items[0].GetName()) + hostIP := strings.TrimLeft(config.Host, "htps:/") + serverURL := url.URL{Scheme: "https", Path: path, Host: hostIP} + + logger.Debugf("Dialer server URL: %s", serverURL.String()) + + dialer := spdy.NewDialer(upgrader, &http.Client{Transport: roundTripper}, http.MethodPost, &serverURL) + stopChan, readyChan := make(chan struct{}, 1), make(chan struct{}, 1) + out, errOut := new(bytes.Buffer), new(bytes.Buffer) + forwarder, err := portforward.New(dialer, []string{fmt.Sprint(lPort) + ":" + fmt.Sprint(deployPort)}, stopChan, readyChan, out, errOut) + if err != nil { + return err + } + + go func() { + for range readyChan { + } + if len(errOut.String()) != 0 { + close(stopChan) + } else if len(out.String()) != 0 { + logger.Debug(out.String()) + } + refName := "localhost:" + fmt.Sprint(lPort) + "/" + imageName + logger.Debugf("Try to push to reference: %s", refName) + ref, err := name.ParseReference(refName) + if err != nil { + close(stopChan) + } + err = remote.Write(ref, image) + if err != nil { + close(stopChan) + } + logger.Debug("Pushed to remote registry.") + close(stopChan) + }() + + if err = forwarder.ForwardPorts(); err != nil { + return err + } + return nil +} + +func getFreePort() (port int, err error) { + var a *net.TCPAddr + if a, err = net.ResolveTCPAddr("tcp", "localhost:0"); err == nil { + var l *net.TCPListener + if l, err = net.ListenTCP("tcp", a); err == nil { + defer l.Close() + return l.Addr().(*net.TCPAddr).Port, nil + } + } + return +} diff --git a/internal/registry/registry.go b/internal/registry/registry.go index d98f9bd..b087a71 100644 --- a/internal/registry/registry.go +++ b/internal/registry/registry.go @@ -18,7 +18,12 @@ import ( "k8s.io/client-go/rest" ) -const RegistryServerLabel = "kndp-registry-server-url" +const ( + RegistryServerLabel = "kndp-registry-server-url" + DefaultRemoteDomain = "xpkg.upbound.io" + LocalServiceName = "registry" + DefaultLocalDomain = LocalServiceName + "." + namespace.Namespace + ".svc.cluster.local" +) type RegistryAuth struct { Username string `json:"username" validate:"required"` @@ -32,7 +37,9 @@ type RegistryConfig struct { } type Registry struct { - Config RegistryConfig + Config RegistryConfig + Default bool + Local bool corev1.Secret } @@ -54,6 +61,7 @@ func Registries(ctx context.Context, client *kubernetes.Clientset) ([]*Registry, // Creates new Registry by required parameters func New(server string, username string, password string, email string) Registry { registry := Registry{ + Default: false, Config: RegistryConfig{ Auths: map[string]RegistryAuth{ server: { @@ -72,16 +80,20 @@ func New(server string, username string, password string, email string) Registry } // Validate data in Registry object -func (r *Registry) Validate() error { +func (r *Registry) Validate(logger *log.Logger) error { validate := validator.New(validator.WithRequiredStructEnabled()) for serverUrl, auth := range r.Config.Auths { - err := validate.Var(serverUrl, "required,http_url") - if err != nil { - return err + if !r.Local { + err := validate.Var(serverUrl, "required,http_url") + if err != nil { + return err + } + } else { + logger.Warn("Custom domains for local repositories do not supported yet, set default: " + DefaultLocalDomain) } - err = validate.Struct(auth) + err := validate.Struct(auth) if err != nil { return err } @@ -131,6 +143,13 @@ func (r *Registry) Create(ctx context.Context, config *rest.Config, logger *log. return err } + if r.Local { + err := r.CreateLocal(ctx, client) + if err != nil { + return err + } + } + secretSpec := r.SecretSpec() secret, err := secretClient(client).Create(ctx, &secretSpec, metav1.CreateOptions{}) if err != nil { @@ -157,6 +176,23 @@ func (r *Registry) Create(ctx context.Context, config *rest.Config, logger *log. secret.ObjectMeta.Name, ) + if r.Default { + if release.Config["args"] == nil { + release.Config["args"] = []interface{}{} + } + args := []string{} + for _, arg := range release.Config["args"].([]interface{}) { + if !strings.Contains(arg.(string), "--registry") { + args = append(args, arg.(string)) + } + } + + release.Config["args"] = append( + args, + "--registry="+r.Domain(), + ) + } + logger.Debug("Upgrade Corssplane chart", "Values", release.Config) return installer.Upgrade(engine.Version, release.Config) @@ -174,6 +210,8 @@ func (r *Registry) ToSecret() *corev1.Secret { json.Unmarshal(rJson, &sec) return &sec } + +// Delete registry func (r *Registry) Delete(ctx context.Context, config *rest.Config, logger *log.Logger) error { installer, err := engine.GetEngine(config) @@ -203,6 +241,19 @@ func (r *Registry) Delete(ctx context.Context, config *rest.Config, logger *log. return nil } + if r.Default { + if release.Config["args"] != nil { + args := []string{} + for _, arg := range release.Config["args"].([]interface{}) { + if !strings.Contains(arg.(string), "--registry") { + args = append(args, arg.(string)) + } + } + + release.Config["args"] = args + } + } + err = installer.Upgrade(engine.Version, release.Config) if err != nil { return err @@ -213,6 +264,11 @@ func (r *Registry) Delete(ctx context.Context, config *rest.Config, logger *log. if err != nil { return err } + + if r.Local { + r.DeleteLocal(ctx, client, logger) + } + return secretClient(client).Delete(ctx, r.Name, metav1.DeleteOptions{}) } @@ -254,6 +310,29 @@ func CopyRegistries(ctx context.Context, logger *log.Logger, sourceConfig *rest. return nil } +// Make registry default +func (r *Registry) SetDefault(d bool) { + r.Default = d +} + +// Make local registry +func (r *Registry) SetLocal(l bool) { + r.Local = l +} + +// Domain of primary registry +func (r *Registry) Domain() string { + if r.Local { + return DefaultLocalDomain + } + domain := DefaultRemoteDomain + for server := range r.Config.Auths { + domain = strings.Split(server, "/")[2] + break + } + return domain +} + func secretClient(client *kubernetes.Clientset) kv1.SecretInterface { return client.CoreV1().Secrets(namespace.Namespace) }