Skip to content

Commit

Permalink
fix: convert UploadArtefact to stream
Browse files Browse the repository at this point in the history
This would previously buffer every file in RAM before uploading to OCI. Instead
it now streams chunks directly from the FTL client to OCI via a pipe.
  • Loading branch information
stuartwdouglas authored and alecthomas committed Feb 17, 2025
1 parent 8190640 commit 1ae468b
Show file tree
Hide file tree
Showing 13 changed files with 301 additions and 262 deletions.
43 changes: 0 additions & 43 deletions backend/controller/artefacts/artefact.go

This file was deleted.

68 changes: 51 additions & 17 deletions backend/controller/artefacts/oci_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,39 @@ import (
"github.com/block/ftl/internal/log"
)

var _ Service = &OCIArtefactService{}
type ArtefactReader interface {
io.ReadCloser
}

// Metadata container for an artefact's metadata
type Metadata struct {
Executable bool
Size int64
Path string
}

type ArtefactUpload struct {
Digest sha256.SHA256
Size int64
Content io.ReadCloser
}

// Artefact container for an artefact's payload and metadata
type Artefact struct {
Digest sha256.SHA256
Metadata Metadata
Content io.ReadCloser
}

type ArtefactKey struct {
Digest sha256.SHA256
}

type ReleaseArtefact struct {
Artefact ArtefactKey
Path string
Executable bool
}

type RegistryConfig struct {
Registry string `help:"OCI container registry, in the form host[:port]/repository" env:"FTL_ARTEFACT_REGISTRY" required:""`
Expand Down Expand Up @@ -188,52 +220,54 @@ func (s *OCIArtefactService) GetDigestsKeys(ctx context.Context, digests []sha25
}

// Upload uploads the specific artifact as a raw blob and links it to a manifest to prevent GC
func (s *OCIArtefactService) Upload(ctx context.Context, artefact Artefact) (sha256.SHA256, error) {
func (s *OCIArtefactService) Upload(ctx context.Context, artefact ArtefactUpload) error {
repo, err := s.repoFactory()
logger := log.FromContext(ctx)
if err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to connect to repository '%s': %w", s.registry, err)
return fmt.Errorf("unable to connect to repository '%s': %w", s.registry, err)
}

// 2. Pack the files and tag the packed manifest
artifactType := "application/vnd.ftl.artifact"

desc, err := pushBlob(ctx, artifactType, artefact.Content, repo)
contentDesc := ocispec.Descriptor{
MediaType: "application/vnd.ftl.artifact",
Digest: digest.Digest("sha256:" + artefact.Digest.String()),
Size: artefact.Size,
}
err = repo.Push(ctx, contentDesc, artefact.Content)
if err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to push to in memory repository %w", err)
return fmt.Errorf("unable to push to in memory repository %w", err)
}
tag := desc.Digest.Hex()
tag := contentDesc.Digest.Hex()
parseSHA256, err := sha256.ParseSHA256(tag)
if err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to parse sha %w", err)
return fmt.Errorf("unable to parse sha %w", err)
}
artefact.Digest = parseSHA256
logger.Debugf("Tagging module blob with digest '%s'", tag)

fileDescriptors := []ocispec.Descriptor{desc}
fileDescriptors := []ocispec.Descriptor{contentDesc}
config := ocispec.ImageConfig{} // Create a new image config
config.Labels = map[string]string{"type": "ftl-artifact"}
configBlob, err := json.Marshal(config) // Marshal the config to json
if err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to marshal config %w", err)
return fmt.Errorf("unable to marshal OCI image config: %w", err)
}
configDesc, err := pushBlob(ctx, ocispec.MediaTypeImageConfig, configBlob, repo) // push config blob
if err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to push config to in memory repository %w", err)
return fmt.Errorf("unable to push OCI image config to OCI registry: %w", err)
}
manifestBlob, err := generateManifestContent(configDesc, fileDescriptors...)
if err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to generate manifest content %w", err)
return fmt.Errorf("unable to generate manifest content: %w", err)
}
manifestDesc, err := pushBlob(ctx, ocispec.MediaTypeImageManifest, manifestBlob, repo) // push manifest blob
if err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to push manifest to in memory repository %w", err)
return fmt.Errorf("unable to push manifest to OCI registry: %w", err)
}
if err = repo.Tag(ctx, manifestDesc, tag); err != nil {
return sha256.SHA256{}, fmt.Errorf("unable to tag in memory repository %w", err)
return fmt.Errorf("unable to tag OCI registry: %w", err)
}

return artefact.Digest, nil
return nil
}

func (s *OCIArtefactService) Download(ctx context.Context, dg sha256.SHA256) (io.ReadCloser, error) {
Expand Down
80 changes: 75 additions & 5 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
sha "crypto/sha256"
"database/sql"
"encoding/binary"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -803,15 +804,55 @@ func (s *Service) GetArtefactDiffs(ctx context.Context, req *connect.Request[ftl
}), nil
}

func (s *Service) UploadArtefact(ctx context.Context, req *connect.Request[ftlv1.UploadArtefactRequest]) (*connect.Response[ftlv1.UploadArtefactResponse], error) {
func (s *Service) UploadArtefact(ctx context.Context, stream *connect.ClientStream[ftlv1.UploadArtefactRequest]) (*connect.Response[ftlv1.UploadArtefactResponse], error) {
logger := log.FromContext(ctx)
firstMsg := NewOnceValue[*ftlv1.UploadArtefactRequest]()
wg, ctx := errgroup.WithContext(ctx)
logger.Debugf("Uploading artefact")
digest, err := s.storage.Upload(ctx, artefacts.Artefact{Content: req.Msg.Content})
r, w := io.Pipe()
// Read bytes from client and upload to OCI
wg.Go(func() error {
msg, ok := firstMsg.Get(ctx)
if !ok {
return nil
}
if msg.Size == 0 {
return fmt.Errorf("artefact size must be specified")
}
digest, err := sha256.ParseSHA256(hex.EncodeToString(msg.Digest))
if err != nil {
return fmt.Errorf("failed to parse digest: %w", err)
}
err = s.storage.Upload(ctx, artefacts.ArtefactUpload{
Digest: digest,
Size: msg.Size,
Content: r,
})
if err != nil {
return fmt.Errorf("failed to upload artefact: %w", err)
}
logger.Debugf("Created new artefact %s", digest)
return nil
})
// Stream bytes from client into the pipe
wg.Go(func() error {
for stream.Receive() {
msg := stream.Msg()
firstMsg.Set(msg)
if _, err := w.Write(msg.Chunk); err != nil {
return fmt.Errorf("failed to write chunk: %w", err)
}
}
if err := stream.Err(); err != nil {
return fmt.Errorf("failed to upload artefact: %w", err)
}
return nil
})
err := wg.Wait()
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to upload artefact: %w", err)
}
logger.Debugf("Created new artefact %s", digest)
return connect.NewResponse(&ftlv1.UploadArtefactResponse{Digest: digest[:]}), nil
return connect.NewResponse(&ftlv1.UploadArtefactResponse{}), nil
}

func (s *Service) getDeployment(ctx context.Context, dkey key.Deployment) (*schema.Module, error) {
Expand Down Expand Up @@ -910,3 +951,32 @@ func validateCallBody(body []byte, verb *schema.Verb, sch *schema.Schema) error
}
return nil
}

type OnceValue[T any] struct {
value T
ready chan struct{}
once sync.Once
}

func NewOnceValue[T any]() *OnceValue[T] {
return &OnceValue[T]{
ready: make(chan struct{}),
}
}

func (o *OnceValue[T]) Set(value T) {
o.once.Do(func() {
o.value = value
close(o.ready)
})
}

func (o *OnceValue[T]) Get(ctx context.Context) (T, bool) {
select {
case <-o.ready:
return o.value, true
case <-ctx.Done():
var zero T
return zero, false
}
}
Loading

0 comments on commit 1ae468b

Please sign in to comment.