Skip to content

Commit

Permalink
Add non-lazy-loaded-layer verification
Browse files Browse the repository at this point in the history
Though we attempt to mimic overlay in MountLocal, we do not do any
verification once we receive the content. This change adds functionality
for this, and the option to disable it as well.

Signed-off-by: David Son <[email protected]>
  • Loading branch information
sondavidb committed Feb 3, 2025
1 parent 3b846ab commit 0206b54
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 14 deletions.
1 change: 1 addition & 0 deletions config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ max_concurrency=0 # Actually zero
no_prometheus=false
mount_timeout_sec=0
fuse_metrics_emit_wait_duration_sec=0
verify_local_mounts=true

## config/config.go Config

Expand Down
4 changes: 4 additions & 0 deletions config/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ type FSConfig struct {
MountTimeoutSec int64 `toml:"mount_timeout_sec"`
FuseMetricsEmitWaitDurationSec int64 `toml:"fuse_metrics_emit_wait_duration_sec"`

// VerifyLocalMounts will verify the shasum of compressed and uncompressed
// content for layers pulled ahead of time (i.e. non-FUSE mounts)
VerifyLocalMounts bool `toml:"verify_local_mounts" default:"true"`

RetryableHTTPClientConfig `toml:"http"`
BlobConfig `toml:"blob"`

Expand Down
97 changes: 96 additions & 1 deletion fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ package fs

import (
"context"
"encoding/json"
"fmt"
"io"
golog "log"
Expand Down Expand Up @@ -81,6 +82,7 @@ import (
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
ociremote "oras.land/oras-go/v2/registry/remote"
)

var (
Expand Down Expand Up @@ -257,6 +259,11 @@ func NewFilesystem(ctx context.Context, root string, cfg config.FSConfig, opts .
pr := newPreresolver(fsOpts.maxConcurrency)
pr.Start(ctx)

var manifests *manifestMap
if cfg.VerifyLocalMounts {
manifests = new(manifestMap)
}

var ns *metrics.Namespace
if !cfg.NoPrometheus {
ns = metrics.NewNamespace("soci", "fs", nil)
Expand Down Expand Up @@ -292,6 +299,7 @@ func NewFilesystem(ctx context.Context, root string, cfg config.FSConfig, opts .
mountTimeout: mountTimeout,
fuseMetricsEmitWaitDuration: fuseMetricsEmitWaitDuration,
pr: pr,
manifests: manifests,
}, nil
}

Expand Down Expand Up @@ -377,6 +385,29 @@ func (c *sociContext) populateImageLayerToSociMapping(sociIndex *soci.Index) {
}
}

// manifestMap is a wrapper for an atomic map
// of a manifest digest to its appropriate config
// key: manifest digest
// val: map where
//
// key: compressed layer digest string
// val: uncompressed layer digest.Digest
type manifestMap struct {
m sync.Map
}

func (m *manifestMap) Store(key string, val map[string]digest.Digest) {
m.m.Store(key, val)
}

func (m *manifestMap) Load(key string) (map[string]digest.Digest, bool) {
val, ok := m.m.Load(key)
if ok {
return val.(map[string]digest.Digest), ok
}
return map[string]digest.Digest{}, false
}

type filesystem struct {
ctx context.Context
resolver *layer.Resolver
Expand All @@ -395,6 +426,7 @@ type filesystem struct {
mountTimeout time.Duration
fuseMetricsEmitWaitDuration time.Duration
pr *preresolver
manifests *manifestMap
}

func (fs *filesystem) MountLocal(ctx context.Context, mountpoint string, labels map[string]string, mounts []mount.Mount) error {
Expand Down Expand Up @@ -425,7 +457,12 @@ func (fs *filesystem) MountLocal(ctx context.Context, mountpoint string, labels
if err != nil {
return fmt.Errorf("cannot create fetcher: %w", err)
}
unpacker := NewLayerUnpacker(fetcher, archive)
diffIDMap, err := fs.getLayerDiffMap(ctx, labels, remoteStore, fetcher)
if err != nil {
return fmt.Errorf("error getting layer diff IDs: %v", err)
}

unpacker := NewLayerUnpacker(fetcher, archive, diffIDMap)
desc := s.Target

// If no descriptor size is given, resolve the layer
Expand All @@ -449,6 +486,64 @@ func (fs *filesystem) MountLocal(ctx context.Context, mountpoint string, labels
return nil
}

// getLayerDiffMap will return the mapping of a compressed layer's shasum
// If VerifyLocalMounts is disabled, this will return nil with no error
func (fs *filesystem) getLayerDiffMap(ctx context.Context, labels map[string]string, remoteStore *ociremote.Repository, fetcher *artifactFetcher) (map[string]digest.Digest, error) {
if fs.manifests == nil {
return nil, nil
}

manDigest, ok := labels[ctdsnapshotters.TargetManifestDigestLabel]
if !ok {
return nil, fmt.Errorf("no manifest label attached to image")
}

diffIDMap, ok := fs.manifests.Load(manDigest)
if !ok {
manifestReq, err := fetcher.resolve(ctx, ocispec.Descriptor{Digest: digest.Digest(manDigest)})
if err != nil {
return nil, fmt.Errorf("error resolving manifest: %v", err)
}
rc, err := remoteStore.Fetch(ctx, manifestReq)
if err != nil {
return nil, fmt.Errorf("error fetching manifest from upstream: %v", err)
}

b, _ := io.ReadAll(rc)
manifest := ocispec.Manifest{}
err = json.Unmarshal(b, &manifest)
if err != nil {
return nil, fmt.Errorf("error unmarshalling manifest JSON: %v", err)
}
rc, err = remoteStore.Fetch(ctx, manifest.Config)
if err != nil {
return nil, fmt.Errorf("error fetching manifest config from upstream: %v", err)
}

b, _ = io.ReadAll(rc)
imgConfig := ocispec.Image{}

err = json.Unmarshal(b, &imgConfig)
if err != nil {
return nil, fmt.Errorf("error unmarshalling image config JSON: %v", err)
}

diffIDShas := imgConfig.RootFS.DiffIDs
compressedShas := manifest.Layers
if len(diffIDShas) != len(compressedShas) {
return nil, fmt.Errorf("mismatch between manifest layers and diff IDs")
}

for i := range len(diffIDShas) {
diffIDMap[compressedShas[i].Digest.String()] = diffIDShas[i]
}

fs.manifests.Store(manDigest, diffIDMap)
}

return diffIDMap, nil
}

func (fs *filesystem) getSociContext(ctx context.Context, imageRef, indexDigest, imageManifestDigest string, client *http.Client) (*sociContext, error) {
cAny, _ := fs.sociContexts.LoadOrStore(imageManifestDigest, &sociContext{})
c, ok := cAny.(*sociContext)
Expand Down
69 changes: 60 additions & 9 deletions fs/unpacker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/containerd/containerd/archive"
"github.com/containerd/containerd/archive/compression"
"github.com/containerd/containerd/mount"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)

Expand All @@ -39,7 +40,7 @@ type Unpacker interface {
type Archive interface {
// Apply decompresses the compressed stream represented by reader `r` and
// applies it to the directory `root`.
Apply(ctx context.Context, root string, r io.Reader, opts ...archive.ApplyOpt) (int64, error)
Apply(ctx context.Context, root string, r io.Reader, verifier *layerVerifier, opts ...archive.ApplyOpt) (int64, error)
}

type layerArchive struct {
Expand All @@ -49,26 +50,58 @@ func NewLayerArchive() Archive {
return &layerArchive{}
}

func (la *layerArchive) Apply(ctx context.Context, root string, r io.Reader, opts ...archive.ApplyOpt) (int64, error) {
// wrapReader uses a TeeReader to feed content into verifier (compressed or uncompressed).
// If verification is disabled, this is a no-op.
func wrapReader(r io.Reader, verifier *layerVerifier, compressed bool) io.Reader {
if verifier != nil {
if compressed {
r = io.TeeReader(r, verifier.compressed)
} else {
r = io.TeeReader(r, verifier.uncompressed)
}
}
return r
}

func (la *layerArchive) Apply(ctx context.Context, root string, r io.Reader, verifier *layerVerifier, opts ...archive.ApplyOpt) (int64, error) {
// we use containerd implementation here
// decompress first and then apply
r = wrapReader(r, verifier, true)
decompressReader, err := compression.DecompressStream(r)
if err != nil {
return 0, fmt.Errorf("cannot decompress the stream: %w", err)
}
defer decompressReader.Close()
return archive.Apply(ctx, root, decompressReader, opts...)

r = wrapReader(decompressReader, verifier, false)
n, err := archive.Apply(ctx, root, r, opts...)
if err != nil {
return 0, err
}

if verifier != nil {
if !verifier.compressed.Verified() {
return 0, fmt.Errorf("compressed digests did not match")
}
if !verifier.uncompressed.Verified() {
return 0, fmt.Errorf("uncompressed digests did not match")
}
}

return n, nil
}

type layerUnpacker struct {
fetcher Fetcher
archive Archive
fetcher Fetcher
archive Archive
diffIDMap map[string]digest.Digest
}

func NewLayerUnpacker(fetcher Fetcher, archive Archive) Unpacker {
func NewLayerUnpacker(fetcher Fetcher, archive Archive, diffIDMap map[string]digest.Digest) Unpacker {
return &layerUnpacker{
fetcher: fetcher,
archive: archive,
fetcher: fetcher,
archive: archive,
diffIDMap: diffIDMap,
}
}

Expand Down Expand Up @@ -100,14 +133,32 @@ func (lu *layerUnpacker) Unpack(ctx context.Context, desc ocispec.Descriptor, mo
if len(parents) > 0 {
opts = append(opts, archive.WithParents(parents))
}
_, err = lu.archive.Apply(ctx, mountpoint, rc, opts...)

var verifier *layerVerifier
if lu.diffIDMap != nil {
uncompressedDigest, ok := lu.diffIDMap[desc.Digest.String()]
if !ok {
return fmt.Errorf("error getting diff ID")
}

verifier = &layerVerifier{
compressed: desc.Digest.Verifier(),
uncompressed: uncompressedDigest.Verifier(),
}
}
_, err = lu.archive.Apply(ctx, mountpoint, rc, verifier, opts...)
if err != nil {
return fmt.Errorf("cannot apply layer: %w", err)
}

return nil
}

type layerVerifier struct {
compressed digest.Verifier
uncompressed digest.Verifier
}

func getLayerParents(options []string) (lower []string, err error) {
const lowerdirPrefix = "lowerdir="

Expand Down
Loading

0 comments on commit 0206b54

Please sign in to comment.