From 86dfd650d4459539d9514a3c3a46fecf98a0279b Mon Sep 17 00:00:00 2001 From: nyagamunene Date: Mon, 16 Dec 2024 14:52:06 +0300 Subject: [PATCH] adjust size of data sent via nats Signed-off-by: nyagamunene --- cmd/proxy/main.go | 8 ++-- proplet/config.json | 8 ++-- proxy/config/http.go | 97 ++++++++++++++++++++++++++++++++++++++------ proxy/mqtt/mqtt.go | 4 ++ 4 files changed, 95 insertions(+), 22 deletions(-) diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index d5d44ef..b633843 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -11,11 +11,7 @@ import ( "golang.org/x/sync/errgroup" ) -const ( - svcName = "proxy" - mqttPrefix = "MQTT_REGISTRY_" - httpPrefix = "HTTP_" -) +const svcName = "proxy" const ( // MQTT configuration settings. @@ -29,6 +25,7 @@ const ( Authenticate = false RegistryUsername = "" RegistryPassword = "" + RegistryPAT = "" ) func main() { @@ -55,6 +52,7 @@ func main() { Authenticate: Authenticate, Username: RegistryUsername, Password: RegistryPassword, + Token: RegistryPAT, } if err := httpCfg.Validate(); err != nil { diff --git a/proplet/config.json b/proplet/config.json index 4717fb5..fdd5a57 100644 --- a/proplet/config.json +++ b/proplet/config.json @@ -1,8 +1,8 @@ { "broker_url": "mqtt://localhost:1883", - "password": "example-password", - "proplet_id": "proplet-1", - "channel_id": "channel-1", - "registry_url": "", + "password": "3963a940-332e-4a18-aa57-bab4d4124ab0", + "proplet_id": "72fd490b-f91f-47dc-aa0b-a65931719ee1", + "channel_id": "cb6cb9ae-ddcf-41ab-8f32-f3e93b3a3be2", + "registry_url": "docker.io", "registry_token": "" } diff --git a/proxy/config/http.go b/proxy/config/http.go index 7ed8282..43b6c27 100644 --- a/proxy/config/http.go +++ b/proxy/config/http.go @@ -2,11 +2,14 @@ package config import ( "context" + "encoding/json" "errors" "fmt" "io" + "log" "net/url" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" "oras.land/oras-go/v2/registry/remote" "oras.land/oras-go/v2/registry/remote/auth" "oras.land/oras-go/v2/registry/remote/retry" @@ -14,7 +17,7 @@ import ( const ( tag = "latest" - chunkSize = 1024 * 1024 + chunkSize = 512000 // 500KB to ensure we're well under NATS limit ) type ChunkPayload struct { @@ -27,6 +30,7 @@ type ChunkPayload struct { type HTTPProxyConfig struct { RegistryURL string `env:"REGISTRY_URL" envDefault:"localhost:5000"` Authenticate bool `env:"AUTHENTICATE" envDefault:"false"` + Token string `env:"PAT" envDefault:""` Username string `env:"USERNAME" envDefault:""` Password string `env:"PASSWORD" envDefault:""` } @@ -39,6 +43,19 @@ func (c *HTTPProxyConfig) Validate() error { return fmt.Errorf("broker_url is not a valid URL: %w", err) } + if c.Authenticate { + hasToken := c.Token != "" + hasCredentials := c.Username != "" && c.Password != "" + + if !hasToken && !hasCredentials { + return errors.New("either PAT or username/password must be provided when authentication is enabled") + } + + if hasToken && c.Username == "" { + return errors.New("username is required when using PAT authentication") + } + } + return nil } @@ -51,13 +68,24 @@ func (c *HTTPProxyConfig) FetchFromReg(ctx context.Context, containerName string } if c.Authenticate { - repo.Client = &auth.Client{ - Client: retry.DefaultClient, - Cache: auth.NewCache(), - Credential: auth.StaticCredential(c.RegistryURL, auth.Credential{ + var cred auth.Credential + + if c.Username != "" && c.Password != "" { + cred = auth.Credential{ Username: c.Username, Password: c.Password, - }), + } + } else if c.Token != "" { + cred = auth.Credential{ + Username: c.Username, + AccessToken: c.Token, + } + } + + repo.Client = &auth.Client{ + Client: retry.DefaultClient, + Cache: auth.NewCache(), + Credential: auth.StaticCredential(c.RegistryURL, cred), } } @@ -66,32 +94,75 @@ func (c *HTTPProxyConfig) FetchFromReg(ctx context.Context, containerName string return nil, fmt.Errorf("failed to resolve manifest for %s: %w", containerName, err) } + log.Printf("Container %s:", containerName) + log.Printf("- Manifest size: %d bytes", descriptor.Size) + log.Printf("- Media type: %s", descriptor.MediaType) + reader, err := repo.Fetch(ctx, descriptor) if err != nil { - return nil, fmt.Errorf("failed to fetch blob for %s: %w", containerName, err) + return nil, fmt.Errorf("failed to fetch manifest for %s: %w", containerName, err) } defer reader.Close() - data, err := io.ReadAll(reader) + manifestData, err := io.ReadAll(reader) if err != nil { - return nil, fmt.Errorf("failed to read blob for %s: %w", containerName, err) + return nil, fmt.Errorf("failed to read manifest for %s: %w", containerName, err) } - totalChunks := (len(data) + chunkSize - 1) / chunkSize + var manifest ocispec.Manifest + if err := json.Unmarshal(manifestData, &manifest); err != nil { + return nil, fmt.Errorf("failed to parse manifest for %s: %w", containerName, err) + } + + var largestLayer ocispec.Descriptor + var maxSize int64 + for _, layer := range manifest.Layers { + if layer.Size > maxSize { + maxSize = layer.Size + largestLayer = layer + } + } + + if largestLayer.Size == 0 { + return nil, fmt.Errorf("no valid layers found in manifest for %s", containerName) + } + + log.Printf("- Found largest layer: %d bytes (%.2f MB)", largestLayer.Size, float64(largestLayer.Size)/(1024*1024)) + + layerReader, err := repo.Fetch(ctx, largestLayer) + if err != nil { + return nil, fmt.Errorf("failed to fetch layer for %s: %w", containerName, err) + } + defer layerReader.Close() + + data, err := io.ReadAll(layerReader) + if err != nil { + return nil, fmt.Errorf("failed to read layer for %s: %w", containerName, err) + } + + dataSize := len(data) + totalChunks := (dataSize + chunkSize - 1) / chunkSize + + log.Printf("- Total data size: %d bytes (%.2f MB)", dataSize, float64(dataSize)/(1024*1024)) + log.Printf("- Chunk size: %d bytes (500 KB)", chunkSize) + log.Printf("- Total chunks: %d", totalChunks) chunks := make([]ChunkPayload, 0, totalChunks) for i := range make([]struct{}, totalChunks) { start := i * chunkSize end := start + chunkSize - if end > len(data) { - end = len(data) + if end > dataSize { + end = dataSize } + chunkData := data[start:end] + log.Printf("- Chunk %d size: %d bytes", i, len(chunkData)) + chunk := ChunkPayload{ AppName: containerName, ChunkIdx: i, TotalChunks: totalChunks, - Data: data[start:end], + Data: chunkData, } chunks = append(chunks, chunk) } diff --git a/proxy/mqtt/mqtt.go b/proxy/mqtt/mqtt.go index 5a1ff96..6d52419 100644 --- a/proxy/mqtt/mqtt.go +++ b/proxy/mqtt/mqtt.go @@ -43,6 +43,10 @@ func NewMQTTClient(cfg *config.MQTTProxyConfig) (*RegistryClient, error) { log.Println("MQTT reconnecting...") }) + opts.SetOnConnectHandler(func(client mqtt.Client) { + log.Println("MQTT connection established successfully") + }) + client := mqtt.NewClient(opts) return &RegistryClient{