Skip to content

Commit

Permalink
adjust size of data sent via nats
Browse files Browse the repository at this point in the history
Signed-off-by: nyagamunene <[email protected]>
  • Loading branch information
nyagamunene committed Dec 16, 2024
1 parent 29755e0 commit 86dfd65
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 22 deletions.
8 changes: 3 additions & 5 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@ import (
"golang.org/x/sync/errgroup"
)

const (
svcName = "proxy"
mqttPrefix = "MQTT_REGISTRY_"
httpPrefix = "HTTP_"
)
const svcName = "proxy"

const (
// MQTT configuration settings.
Expand All @@ -29,6 +25,7 @@ const (
Authenticate = false
RegistryUsername = ""
RegistryPassword = ""
RegistryPAT = ""
)

func main() {
Expand All @@ -55,6 +52,7 @@ func main() {
Authenticate: Authenticate,
Username: RegistryUsername,
Password: RegistryPassword,
Token: RegistryPAT,
}

if err := httpCfg.Validate(); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions proplet/config.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{
"broker_url": "mqtt://localhost:1883",
"password": "example-password",
"proplet_id": "proplet-1",
"channel_id": "channel-1",
"registry_url": "",
"password": "3963a940-332e-4a18-aa57-bab4d4124ab0",
"proplet_id": "72fd490b-f91f-47dc-aa0b-a65931719ee1",
"channel_id": "cb6cb9ae-ddcf-41ab-8f32-f3e93b3a3be2",
"registry_url": "docker.io",
"registry_token": ""
}
97 changes: 84 additions & 13 deletions proxy/config/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,22 @@ package config

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/url"

ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/v2/registry/remote"
"oras.land/oras-go/v2/registry/remote/auth"
"oras.land/oras-go/v2/registry/remote/retry"
)

const (
tag = "latest"
chunkSize = 1024 * 1024
chunkSize = 512000 // 500KB to ensure we're well under NATS limit
)

type ChunkPayload struct {
Expand All @@ -27,6 +30,7 @@ type ChunkPayload struct {
type HTTPProxyConfig struct {
RegistryURL string `env:"REGISTRY_URL" envDefault:"localhost:5000"`
Authenticate bool `env:"AUTHENTICATE" envDefault:"false"`
Token string `env:"PAT" envDefault:""`
Username string `env:"USERNAME" envDefault:""`
Password string `env:"PASSWORD" envDefault:""`
}
Expand All @@ -39,6 +43,19 @@ func (c *HTTPProxyConfig) Validate() error {
return fmt.Errorf("broker_url is not a valid URL: %w", err)
}

if c.Authenticate {
hasToken := c.Token != ""
hasCredentials := c.Username != "" && c.Password != ""

if !hasToken && !hasCredentials {
return errors.New("either PAT or username/password must be provided when authentication is enabled")
}

if hasToken && c.Username == "" {
return errors.New("username is required when using PAT authentication")
}
}

return nil
}

Expand All @@ -51,13 +68,24 @@ func (c *HTTPProxyConfig) FetchFromReg(ctx context.Context, containerName string
}

if c.Authenticate {
repo.Client = &auth.Client{
Client: retry.DefaultClient,
Cache: auth.NewCache(),
Credential: auth.StaticCredential(c.RegistryURL, auth.Credential{
var cred auth.Credential

if c.Username != "" && c.Password != "" {
cred = auth.Credential{
Username: c.Username,
Password: c.Password,
}),
}
} else if c.Token != "" {
cred = auth.Credential{
Username: c.Username,
AccessToken: c.Token,
}
}

repo.Client = &auth.Client{
Client: retry.DefaultClient,
Cache: auth.NewCache(),
Credential: auth.StaticCredential(c.RegistryURL, cred),
}
}

Expand All @@ -66,32 +94,75 @@ func (c *HTTPProxyConfig) FetchFromReg(ctx context.Context, containerName string
return nil, fmt.Errorf("failed to resolve manifest for %s: %w", containerName, err)
}

log.Printf("Container %s:", containerName)
log.Printf("- Manifest size: %d bytes", descriptor.Size)
log.Printf("- Media type: %s", descriptor.MediaType)

reader, err := repo.Fetch(ctx, descriptor)
if err != nil {
return nil, fmt.Errorf("failed to fetch blob for %s: %w", containerName, err)
return nil, fmt.Errorf("failed to fetch manifest for %s: %w", containerName, err)
}
defer reader.Close()

data, err := io.ReadAll(reader)
manifestData, err := io.ReadAll(reader)
if err != nil {
return nil, fmt.Errorf("failed to read blob for %s: %w", containerName, err)
return nil, fmt.Errorf("failed to read manifest for %s: %w", containerName, err)
}

totalChunks := (len(data) + chunkSize - 1) / chunkSize
var manifest ocispec.Manifest
if err := json.Unmarshal(manifestData, &manifest); err != nil {
return nil, fmt.Errorf("failed to parse manifest for %s: %w", containerName, err)
}

var largestLayer ocispec.Descriptor
var maxSize int64
for _, layer := range manifest.Layers {
if layer.Size > maxSize {
maxSize = layer.Size
largestLayer = layer
}
}

if largestLayer.Size == 0 {
return nil, fmt.Errorf("no valid layers found in manifest for %s", containerName)
}

log.Printf("- Found largest layer: %d bytes (%.2f MB)", largestLayer.Size, float64(largestLayer.Size)/(1024*1024))

Check failure on line 130 in proxy/config/http.go

View workflow job for this annotation

GitHub Actions / Lint and Build

Magic number: 1024, in <operation> detected (mnd)

layerReader, err := repo.Fetch(ctx, largestLayer)
if err != nil {
return nil, fmt.Errorf("failed to fetch layer for %s: %w", containerName, err)
}
defer layerReader.Close()

data, err := io.ReadAll(layerReader)
if err != nil {
return nil, fmt.Errorf("failed to read layer for %s: %w", containerName, err)
}

dataSize := len(data)
totalChunks := (dataSize + chunkSize - 1) / chunkSize

log.Printf("- Total data size: %d bytes (%.2f MB)", dataSize, float64(dataSize)/(1024*1024))

Check failure on line 146 in proxy/config/http.go

View workflow job for this annotation

GitHub Actions / Lint and Build

Magic number: 1024, in <operation> detected (mnd)
log.Printf("- Chunk size: %d bytes (500 KB)", chunkSize)
log.Printf("- Total chunks: %d", totalChunks)

chunks := make([]ChunkPayload, 0, totalChunks)
for i := range make([]struct{}, totalChunks) {
start := i * chunkSize
end := start + chunkSize
if end > len(data) {
end = len(data)
if end > dataSize {
end = dataSize
}

chunkData := data[start:end]
log.Printf("- Chunk %d size: %d bytes", i, len(chunkData))

chunk := ChunkPayload{
AppName: containerName,
ChunkIdx: i,
TotalChunks: totalChunks,
Data: data[start:end],
Data: chunkData,
}
chunks = append(chunks, chunk)
}
Expand Down
4 changes: 4 additions & 0 deletions proxy/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ func NewMQTTClient(cfg *config.MQTTProxyConfig) (*RegistryClient, error) {
log.Println("MQTT reconnecting...")
})

opts.SetOnConnectHandler(func(client mqtt.Client) {
log.Println("MQTT connection established successfully")
})

client := mqtt.NewClient(opts)

return &RegistryClient{
Expand Down

0 comments on commit 86dfd65

Please sign in to comment.