Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add full reader verification #1457

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions fs/backgroundfetcher/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,12 @@ func (lr *sequentialLayerResolver) Resolve(ctx context.Context) (bool, error) {
}
if errors.Is(err, sm.ErrExceedMaxSpan) {
commonmetrics.MeasureLatencyInMilliseconds(commonmetrics.BackgroundFetch, lr.layerDigest, lr.base.start)
lr.SpanManager.MarkDownloaded()
return false, nil
}

commonmetrics.IncOperationCount(commonmetrics.BackgroundSpanFetchFailureCount, lr.layerDigest)
lr.SpanManager.MarkDownloaded()
return false, fmt.Errorf("error trying to fetch span with spanId = %d from layerDigest = %s: %w",
lr.nextSpanFetchID, lr.layerDigest.String(), err)
}
16 changes: 13 additions & 3 deletions fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import (
"github.com/awslabs/soci-snapshotter/fs/layer"
commonmetrics "github.com/awslabs/soci-snapshotter/fs/metrics/common"
layermetrics "github.com/awslabs/soci-snapshotter/fs/metrics/layer"
"github.com/awslabs/soci-snapshotter/fs/reader"
"github.com/awslabs/soci-snapshotter/fs/remote"
"github.com/awslabs/soci-snapshotter/fs/source"
"github.com/awslabs/soci-snapshotter/idtools"
Expand Down Expand Up @@ -229,7 +230,7 @@ func NewFilesystem(ctx context.Context, root string, cfg config.FSConfig, opts .
var bgFetcher *bf.BackgroundFetcher

if !cfg.BackgroundFetchConfig.Disable {
log.G(context.Background()).WithFields(logrus.Fields{
log.G(ctx).WithFields(logrus.Fields{
"fetchPeriod": bgFetchPeriod,
"silencePeriod": bgSilencePeriod,
"maxQueueSize": bgMaxQueueSize,
Expand All @@ -246,10 +247,19 @@ func NewFilesystem(ctx context.Context, root string, cfg config.FSConfig, opts .
}
go bgFetcher.Run(context.Background())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't looked too closely and unrelated, but any chance we might want to pass ctx into here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we probably should.

} else {
log.G(context.Background()).Info("background fetch is disabled")
log.G(ctx).Info("background fetch is disabled")
}

var readerVerifier *reader.Verifier
if !cfg.DisableVerification {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So currently this is using the TOML config variable for disabling tar header verification. I think there's a world where one might want tar header integrity per file read (a somewhat hefty but not super resource-intensive process, particularly for lighter workloads) but not want to verify the entire image (a pretty resource-intensive process).

TL;DR I think this should be its own config variable.

log.G(ctx).Info("creating reader verifier")
readerVerifier = reader.NewVerifier(cfg.BackgroundFetchConfig.MaxQueueSize)
go readerVerifier.Run()
} else {
log.G(ctx).Info("reader verification is disabled")
}

r, err := layer.NewResolver(root, cfg, fsOpts.resolveHandlers, metadataStore, store, fsOpts.overlayOpaqueType, bgFetcher)
r, err := layer.NewResolver(root, cfg, fsOpts.resolveHandlers, metadataStore, store, fsOpts.overlayOpaqueType, bgFetcher, readerVerifier)
if err != nil {
return nil, fmt.Errorf("failed to setup resolver: %w", err)
}
Expand Down
10 changes: 9 additions & 1 deletion fs/layer/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,12 @@ type Resolver struct {
artifactStore content.Storage
overlayOpaqueType OverlayOpaqueType
bgFetcher *backgroundfetcher.BackgroundFetcher
readerVerifier *reader.Verifier
}

// NewResolver returns a new layer resolver.
func NewResolver(root string, cfg config.FSConfig, resolveHandlers map[string]remote.Handler,
metadataStore metadata.Store, artifactStore content.Storage, overlayOpaqueType OverlayOpaqueType, bgFetcher *backgroundfetcher.BackgroundFetcher) (*Resolver, error) {
metadataStore metadata.Store, artifactStore content.Storage, overlayOpaqueType OverlayOpaqueType, bgFetcher *backgroundfetcher.BackgroundFetcher, readerVerifier *reader.Verifier) (*Resolver, error) {
resolveResultEntry := cfg.ResolveResultEntry
if resolveResultEntry == 0 {
resolveResultEntry = defaultResolveResultEntry
Expand Down Expand Up @@ -175,6 +176,7 @@ func NewResolver(root string, cfg config.FSConfig, resolveHandlers map[string]re
artifactStore: artifactStore,
overlayOpaqueType: overlayOpaqueType,
bgFetcher: bgFetcher,
readerVerifier: readerVerifier,
}, nil
}

Expand Down Expand Up @@ -334,6 +336,12 @@ func (r *Resolver) Resolve(ctx context.Context, hosts []docker.RegistryHost, ref
r.bgFetcher.Add(bgLayerResolver)
}
vr, err := reader.NewReader(meta, desc.Digest, spanManager, disableVerification)
if r.readerVerifier != nil {
err := r.readerVerifier.Add(vr)
if err != nil {
return nil, err
}
}
if err != nil {
return nil, fmt.Errorf("failed to read layer: %w", err)
}
Expand Down
2 changes: 2 additions & 0 deletions fs/metrics/common/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ const (
FuseWhiteoutGetattrFailureCount = "fuse_whiteout_getattr_failure_count"
FuseUnknownFailureCount = "fuse_unknown_operation_failure_count"

FileVerificationFailureCount = "file_verification_failure_count"

// TODO this metric is not available now. This needs to go down to BlobReader where the actuall http call is issued
SynchronousBytesFetched = "synchronous_bytes_fetched"

Expand Down
46 changes: 45 additions & 1 deletion fs/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"errors"
"fmt"
"io"
"os"
"sync"
"sync/atomic"
"time"
Expand All @@ -57,6 +58,8 @@ import (
digest "github.com/opencontainers/go-digest"
)

var errReaderClosed = errors.New("reader is already closed")

type Reader interface {
OpenFile(id uint32) (io.ReaderAt, error)
Metadata() metadata.Reader
Expand All @@ -71,6 +74,7 @@ func NewReader(r metadata.Reader, layerSha digest.Digest, spanManager *spanmanag
r: r,
layerSha: layerSha,
disableVerification: disableVerification,
closedC: make(chan struct{}),
}, nil
}

Expand All @@ -83,6 +87,7 @@ type reader struct {
lastReadTimeMu sync.Mutex

closed bool
closedC chan struct{}
closedMu sync.Mutex

disableVerification bool
Expand All @@ -92,6 +97,38 @@ func (gr *reader) Metadata() metadata.Reader {
return gr.r
}

func (gr *reader) Verify() error {
if gr.disableVerification {
return nil
}
queue := []uint32{gr.r.RootID()}
for len(queue) > 0 && !gr.closed {
current := queue[0]
queue = queue[1:]
gr.r.ForeachChild(current, func(_ string, id uint32, _ os.FileMode) bool {
queue = append(queue, id)
return true
})
if current == gr.r.RootID() {
// The RootID does not map to a real file in the layer, so we don't verify it.
continue
}
f, err := gr.openFile(current)
if err != nil {
if errors.Is(err, errReaderClosed) {
// we're in cleanup, this is fine
return nil
}
return err
}
err = f.Verify()
if err != nil {
return err
}
}
return nil
}

func (gr *reader) setLastReadTime(lastReadTime time.Time) {
gr.lastReadTimeMu.Lock()
gr.lastReadTime = lastReadTime
Expand All @@ -106,8 +143,12 @@ func (gr *reader) LastOnDemandReadTime() time.Time {
}

func (gr *reader) OpenFile(id uint32) (io.ReaderAt, error) {
return gr.openFile(id)
}

func (gr *reader) openFile(id uint32) (*file, error) {
if gr.isClosed() {
return nil, fmt.Errorf("reader is already closed")
return nil, errReaderClosed
}
var fr metadata.File
fr, err := gr.r.OpenFile(id)
Expand All @@ -128,6 +169,7 @@ func (gr *reader) Close() (retErr error) {
return nil
}
gr.closed = true
close(gr.closedC)
if err := gr.r.Close(); err != nil {
retErr = errors.Join(retErr, err)
}
Expand Down Expand Up @@ -202,6 +244,8 @@ func (sf *file) Verify() (retErr error) {
defer func() {
if retErr == nil {
sf.verified.Store(true)
} else {
commonmetrics.IncOperationCount(commonmetrics.FileVerificationFailureCount, sf.gr.layerSha)
}
}()

Expand Down
106 changes: 106 additions & 0 deletions fs/reader/verifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
Copyright The Soci Snapshotter Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package reader

import (
"errors"
"sync"

"github.com/containerd/log"
)

var ErrNotReader = errors.New("reader is not a *reader.reader")

// Verifier is a rate-limited reader.reader verifier.
// It will verify all reader.reader's added via `Add`
// when their spanmanager indicates that the image is fully
// downloaded. Verifier will verify exactly 1 reader.reader
// at a time.
// Similar to the BackgroundFetcher, this is a concurrency limiter
// to make sure SOCI's background processes don't compete with
// the containerized workload.
type Verifier struct {
queue chan *reader

closedMu sync.Mutex
closed bool
closedC chan struct{}
}

// NewVerifier creates a verifier with the specified max queue size.
func NewVerifier(maxQueueSize int) *Verifier {
return &Verifier{
queue: make(chan *reader, maxQueueSize),
closedMu: sync.Mutex{},
closed: false,
closedC: make(chan struct{}),
}
}

// Add adds a reader.reader to the verifier's queue
// once the reader.reader's span manager finishes downloading
// the image.
//
// `r` must be a `*reader.reader` obtained via `reader.NewReader`
func (v *Verifier) Add(r Reader) error {
switch r := r.(type) {
case *reader:
go func() {
select {
case <-r.spanManager.DownloadedC:
v.queue <- r
case <-v.closedC:
case <-r.closedC:
}
}()
return nil
default:
return ErrNotReader
}
}

// Run runs the verifier to verify readers on the queue.
// The process will run until the verifier is closed and
// will only verify one reader at a time.
func (v *Verifier) Run() {
for {
select {
case r := <-v.queue:
if !r.isClosed() {
l := log.L.WithField("layer", r.layerSha)
err := r.Verify()
if err == nil {
l.Debug("verified reader")
} else {
l.WithError(err).Error("failed to verify reader")
}

}
case <-v.closedC:
}
}
}

// Close closes the verifier and prevents new readers from queuing.
func (v *Verifier) Close() {
v.closedMu.Lock()
if !v.closed {
v.closed = true
close(v.closedC)
}
v.closedMu.Unlock()
}
14 changes: 14 additions & 0 deletions fs/span-manager/span_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"io"
"runtime"
"sync"

"github.com/awslabs/soci-snapshotter/cache"
"github.com/awslabs/soci-snapshotter/ztoc"
Expand Down Expand Up @@ -72,6 +73,9 @@ type SpanManager struct {
spans []*span
ztoc *ztoc.Ztoc
maxSpanVerificationFailureRetries int
downloaded bool
downloadedMu sync.Mutex
DownloadedC chan struct{}
}

type spanInfo struct {
Expand Down Expand Up @@ -117,6 +121,7 @@ func New(ztoc *ztoc.Ztoc, r *io.SectionReader, cache cache.BlobCache, retries in
spans: spans,
ztoc: ztoc,
maxSpanVerificationFailureRetries: retries,
DownloadedC: make(chan struct{}),
}
if m.maxSpanVerificationFailureRetries < 0 {
m.maxSpanVerificationFailureRetries = defaultSpanVerificationFailureRetries
Expand Down Expand Up @@ -314,6 +319,15 @@ func (m *SpanManager) getSpanContent(spanID compression.SpanID, offsetStart, off
return io.NopCloser(buf), nil
}

func (m *SpanManager) MarkDownloaded() {
m.downloadedMu.Lock()
if !m.downloaded {
m.downloaded = true
close(m.DownloadedC)
sondavidb marked this conversation as resolved.
Show resolved Hide resolved
}
m.downloadedMu.Unlock()
}

// fetchAndCacheSpan fetches a span, uncompresses the span if `uncompress == true`,
// caches and returns the span content. The span state is set to `fetched/uncompressed`,
// depending on if `uncompress` is enabled.
Expand Down
4 changes: 0 additions & 4 deletions metadata/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,10 +617,6 @@ func (r *reader) OpenFile(id uint32) (File, error) {
return fmt.Errorf("failed to get file bucket %d: %w", id, err)
}
size, _ = binary.Varint(b.Get(bucketKeySize))
m, _ := binary.Uvarint(b.Get(bucketKeyMode))
if !os.FileMode(uint32(m)).IsRegular() {
return fmt.Errorf("%q is not a regular file", id)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this chunk of code get removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This prevented calling metadataReader.OpenFile on a non-regular file. That's probably not the right thing to do because a metadata.File is the information about a file's uncompressed offset/size and tar header offset/size within the uncompressed layer. That applied to all files, not just regular files.

In a practical sense, the big issue is that reader.Verify can't verify directories without this change.

metadataEntries, err := getMetadataBucket(tx, r.fsID)
if err != nil {
return fmt.Errorf("metadata bucket of %q not found for opening %d: %w", r.fsID, id, err)
Expand Down
Loading