Skip to content

Commit

Permalink
Unified HTTP client
Browse files Browse the repository at this point in the history
Right now, the snapshotter maintains a single HTTP client for fetching
SOCI artifacts and `n` clients for every layer in an image (used to
fetch spans/layers). Every client maintains its own credential cache,
meaning we have to re-authenticate an extra `n` times everytime we need
to fetch/refresh credentials. This change unifies both client creation
and authentication by creating a global `AuthClient` type that contains
a single `http.Client` and credential cache (one `AuthClient` per process).
The `AuthClient` is responsible for authenticating with registries and
sending the request out via  it's inner `retryable` HTTP client.

It also includes some smaller fixes like ensuring that we store the
final blob URL instead of the original base URL in our `httpFetcher`.

Signed-off-by: Yasin Turan <[email protected]>
  • Loading branch information
turan18 committed Dec 14, 2023
1 parent 7044591 commit 59f5919
Show file tree
Hide file tree
Showing 27 changed files with 1,287 additions and 709 deletions.
4 changes: 2 additions & 2 deletions config/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ type FSConfig struct {
// BlobConfig is config for layer blob management.
type BlobConfig struct {
ValidInterval int64 `toml:"valid_interval"`
CheckAlways bool `toml:"check_always"`
FetchTimeoutSec int64 `toml:"fetching_timeout_sec"`
ForceSingleRangeMode bool `toml:"force_single_range_mode"`
MaxRetries int `toml:"max_retries"`
MinWaitMsec int64 `toml:"min_wait_msec"`
MaxWaitMsec int64 `toml:"max_wait_msec"`
CheckAlways bool `toml:"check_always"`
ForceSingleRangeMode bool `toml:"force_single_range_mode"`

// MaxSpanVerificationRetries defines the number of additional times fetch
// will be invoked in case of span verification failure.
Expand Down
29 changes: 3 additions & 26 deletions fs/artifact_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@ import (
"errors"
"fmt"
"io"
"net/http"
"strconv"

"github.com/awslabs/soci-snapshotter/config"
"github.com/awslabs/soci-snapshotter/service/keychain/dockerconfig"
"github.com/awslabs/soci-snapshotter/soci"
"github.com/awslabs/soci-snapshotter/soci/store"
socihttp "github.com/awslabs/soci-snapshotter/util/http"
"github.com/awslabs/soci-snapshotter/util/ioutils"
"github.com/containerd/containerd/reference"
"github.com/containerd/log"
Expand All @@ -37,7 +35,6 @@ import (
"oras.land/oras-go/v2/content"
"oras.land/oras-go/v2/errdef"
"oras.land/oras-go/v2/registry/remote"
"oras.land/oras-go/v2/registry/remote/auth"
)

type Fetcher interface {
Expand Down Expand Up @@ -70,32 +67,12 @@ func newArtifactFetcher(refspec reference.Spec, localStore store.BasicStore, rem
}, nil
}

func newRemoteStore(refspec reference.Spec, httpConfig config.RetryableHTTPClientConfig) (*remote.Repository, error) {
func newRemoteStore(refspec reference.Spec, client *http.Client) (*remote.Repository, error) {
repo, err := remote.NewRepository(refspec.Locator)
if err != nil {
return nil, fmt.Errorf("cannot create repository %s: %w", refspec.Locator, err)
}

authClient := auth.Client{
Client: socihttp.NewRetryableClient(httpConfig),
Cache: auth.DefaultCache,
Credential: func(_ context.Context, host string) (auth.Credential, error) {
username, secret, err := dockerconfig.DockerCreds(host)
if err != nil {
return auth.EmptyCredential, err
}
if username == "" && secret != "" {
return auth.Credential{
RefreshToken: secret,
}, nil
}
return auth.Credential{
Username: username,
Password: secret,
}, nil
},
}
repo.Client = &authClient
repo.Client = client
return repo, nil
}

Expand Down
28 changes: 14 additions & 14 deletions fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"context"
"fmt"
golog "log"
"net/http"
"os/exec"
"sync"
"syscall"
Expand Down Expand Up @@ -138,8 +139,8 @@ func NewFilesystem(ctx context.Context, root string, cfg config.FSConfig, opts .

getSources := fsOpts.getSources
if getSources == nil {
getSources = source.FromDefaultLabels(func(refspec reference.Spec) (hosts []docker.RegistryHost, _ error) {
return docker.ConfigureDefaultRegistries(docker.WithPlainHTTP(docker.MatchLocalhost))(refspec.Hostname())
getSources = source.FromDefaultLabels(func(host string) (hosts []docker.RegistryHost, _ error) {
return docker.ConfigureDefaultRegistries(docker.WithPlainHTTP(docker.MatchLocalhost))(host)
})
}
ctx, store, err := store.NewContentStore(ctx, store.WithType(store.ContentStoreType(cfg.ContentStoreConfig.Type)), store.WithNamespace(cfg.ContentStoreConfig.Namespace))
Expand Down Expand Up @@ -206,7 +207,6 @@ func NewFilesystem(ctx context.Context, root string, cfg config.FSConfig, opts .
attrTimeout: attrTimeout,
entryTimeout: entryTimeout,
negativeTimeout: negativeTimeout,
httpConfig: cfg.RetryableHTTPClientConfig,
contentStore: store,
bgFetcher: bgFetcher,
mountTimeout: mountTimeout,
Expand All @@ -224,7 +224,7 @@ type sociContext struct {
fuseOperationCounter *layer.FuseOperationCounter
}

func (c *sociContext) Init(fsCtx context.Context, ctx context.Context, imageRef, indexDigest, imageManifestDigest string, store store.Store, fuseOpEmitWaitDuration time.Duration, httpConfig config.RetryableHTTPClientConfig) error {
func (c *sociContext) Init(fsCtx context.Context, ctx context.Context, imageRef, indexDigest, imageManifestDigest string, store store.Store, fuseOpEmitWaitDuration time.Duration, client *http.Client) error {
var retErr error
c.fetchOnce.Do(func() {
defer func() {
Expand All @@ -241,7 +241,7 @@ func (c *sociContext) Init(fsCtx context.Context, ctx context.Context, imageRef,
return
}

remoteStore, err := newRemoteStore(refspec, httpConfig)
remoteStore, err := newRemoteStore(refspec, client)
if err != nil {
retErr = err
return
Expand Down Expand Up @@ -309,7 +309,6 @@ type filesystem struct {
attrTimeout time.Duration
entryTimeout time.Duration
negativeTimeout time.Duration
httpConfig config.RetryableHTTPClientConfig
sociContexts sync.Map
contentStore store.Store
bgFetcher *bf.BackgroundFetcher
Expand All @@ -331,12 +330,13 @@ func (fs *filesystem) MountLocal(ctx context.Context, mountpoint string, labels
}
// download the target layer
s := src[0]
client := s.Hosts[0].Client
archive := NewLayerArchive()
refspec, err := reference.Parse(imageRef)
if err != nil {
return fmt.Errorf("cannot parse image ref (%s): %w", imageRef, err)
}
remoteStore, err := newRemoteStore(refspec, fs.httpConfig)
remoteStore, err := newRemoteStore(refspec, client)
if err != nil {
return fmt.Errorf("cannot create remote store: %w", err)
}
Expand All @@ -354,13 +354,13 @@ func (fs *filesystem) MountLocal(ctx context.Context, mountpoint string, labels
return nil
}

func (fs *filesystem) getSociContext(ctx context.Context, imageRef, indexDigest, imageManifestDigest string) (*sociContext, error) {
func (fs *filesystem) getSociContext(ctx context.Context, imageRef, indexDigest, imageManifestDigest string, client *http.Client) (*sociContext, error) {
cAny, _ := fs.sociContexts.LoadOrStore(imageManifestDigest, &sociContext{})
c, ok := cAny.(*sociContext)
if !ok {
return nil, fmt.Errorf("could not load index: fs soci context is invalid type for %s", indexDigest)
}
err := c.Init(fs.ctx, ctx, imageRef, indexDigest, imageManifestDigest, fs.contentStore, fs.fuseMetricsEmitWaitDuration, fs.httpConfig)
err := c.Init(fs.ctx, ctx, imageRef, indexDigest, imageManifestDigest, fs.contentStore, fs.fuseMetricsEmitWaitDuration, client)
return c, err
}

Expand All @@ -381,18 +381,18 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s
return fmt.Errorf("unable to get image digest from labels")
}

c, err := fs.getSociContext(ctx, imageRef, sociIndexDigest, imgDigest)
if err != nil {
return fmt.Errorf("unable to fetch SOCI artifacts: %w", err)
}

// Get source information of this layer.
src, err := fs.getSources(labels)
if err != nil {
return err
} else if len(src) == 0 {
return fmt.Errorf("source must be passed")
}
client := src[0].Hosts[0].Client
c, err := fs.getSociContext(ctx, imageRef, sociIndexDigest, imgDigest, client)
if err != nil {
return fmt.Errorf("unable to fetch SOCI artifacts: %w", err)
}

// Resolve the target layer
var (
Expand Down
6 changes: 3 additions & 3 deletions fs/fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ func TestCheck(t *testing.T) {
layer: map[string]layer.Layer{
"test": bl,
},
getSources: source.FromDefaultLabels(func(refspec reference.Spec) (hosts []docker.RegistryHost, _ error) {
return docker.ConfigureDefaultRegistries(docker.WithPlainHTTP(docker.MatchLocalhost))(refspec.Hostname())
getSources: source.FromDefaultLabels(func(host string) (hosts []docker.RegistryHost, _ error) {
return docker.ConfigureDefaultRegistries(docker.WithPlainHTTP(docker.MatchLocalhost))(host)
}),
}
bl.success = true
Expand Down Expand Up @@ -90,7 +90,7 @@ func (l *breakableLayer) Check() error {
}
return nil
}
func (l *breakableLayer) Refresh(ctx context.Context, hosts source.RegistryHosts, refspec reference.Spec, desc ocispec.Descriptor) error {
func (l *breakableLayer) Refresh(ctx context.Context, hosts []docker.RegistryHost, refspec reference.Spec, desc ocispec.Descriptor) error {
if !l.success {
return fmt.Errorf("failed")
}
Expand Down
10 changes: 5 additions & 5 deletions fs/layer/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ import (
"github.com/awslabs/soci-snapshotter/fs/reader"
"github.com/awslabs/soci-snapshotter/fs/remote"

"github.com/awslabs/soci-snapshotter/fs/source"
spanmanager "github.com/awslabs/soci-snapshotter/fs/span-manager"
"github.com/awslabs/soci-snapshotter/metadata"
"github.com/awslabs/soci-snapshotter/util/lrucache"
"github.com/awslabs/soci-snapshotter/util/namedmutex"
"github.com/awslabs/soci-snapshotter/ztoc"
"github.com/containerd/containerd/reference"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/log"
fusefs "github.com/hanwen/go-fuse/v2/fs"
digest "github.com/opencontainers/go-digest"
Expand Down Expand Up @@ -90,7 +90,7 @@ type Layer interface {
Check() error

// Refresh refreshes the layer connection.
Refresh(ctx context.Context, hosts source.RegistryHosts, refspec reference.Spec, desc ocispec.Descriptor) error
Refresh(ctx context.Context, hosts []docker.RegistryHost, refspec reference.Spec, desc ocispec.Descriptor) error

// Verify verifies this layer using the passed TOC Digest.
// Nop if Verify() or SkipVerify() was already called.
Expand Down Expand Up @@ -230,7 +230,7 @@ func newCache(root string, cacheType string, cfg config.FSConfig) (cache.BlobCac
}

// Resolve resolves a layer based on the passed layer blob information.
func (r *Resolver) Resolve(ctx context.Context, hosts source.RegistryHosts, refspec reference.Spec, desc, sociDesc ocispec.Descriptor, opCounter *FuseOperationCounter, disableVerification bool, metadataOpts ...metadata.Option) (_ Layer, retErr error) {
func (r *Resolver) Resolve(ctx context.Context, hosts []docker.RegistryHost, refspec reference.Spec, desc, sociDesc ocispec.Descriptor, opCounter *FuseOperationCounter, disableVerification bool, metadataOpts ...metadata.Option) (_ Layer, retErr error) {
name := refspec.String() + "/" + desc.Digest.String()

// Wait if resolving this layer is already running. The result
Expand Down Expand Up @@ -354,7 +354,7 @@ func (r *Resolver) Resolve(ctx context.Context, hosts source.RegistryHosts, refs
}

// resolveBlob resolves a blob based on the passed layer blob information.
func (r *Resolver) resolveBlob(ctx context.Context, hosts source.RegistryHosts, refspec reference.Spec, desc ocispec.Descriptor) (_ *blobRef, retErr error) {
func (r *Resolver) resolveBlob(ctx context.Context, hosts []docker.RegistryHost, refspec reference.Spec, desc ocispec.Descriptor) (_ *blobRef, retErr error) {
name := refspec.String() + "/" + desc.Digest.String()

// Try to retrieve the blob from the underlying LRU cache.
Expand Down Expand Up @@ -450,7 +450,7 @@ func (l *layer) Check() error {
return l.blob.Check()
}

func (l *layer) Refresh(ctx context.Context, hosts source.RegistryHosts, refspec reference.Spec, desc ocispec.Descriptor) error {
func (l *layer) Refresh(ctx context.Context, hosts []docker.RegistryHost, refspec reference.Spec, desc ocispec.Descriptor) error {
if l.isClosed() {
return fmt.Errorf("layer is already closed")
}
Expand Down
4 changes: 2 additions & 2 deletions fs/layer/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ import (
"github.com/awslabs/soci-snapshotter/cache"
"github.com/awslabs/soci-snapshotter/fs/reader"
"github.com/awslabs/soci-snapshotter/fs/remote"
"github.com/awslabs/soci-snapshotter/fs/source"
spanmanager "github.com/awslabs/soci-snapshotter/fs/span-manager"
"github.com/awslabs/soci-snapshotter/metadata"
"github.com/awslabs/soci-snapshotter/util/testutil"
"github.com/awslabs/soci-snapshotter/ztoc"
"github.com/containerd/containerd/reference"
"github.com/containerd/containerd/remotes/docker"
"github.com/google/go-cmp/cmp"
fusefs "github.com/hanwen/go-fuse/v2/fs"
"github.com/hanwen/go-fuse/v2/fuse"
Expand Down Expand Up @@ -392,7 +392,7 @@ func (tb *testBlobState) ReadAt(p []byte, offset int64, opts ...remote.Option) (
return 0, nil
}
func (tb *testBlobState) Cache(offset int64, size int64, opts ...remote.Option) error { return nil }
func (tb *testBlobState) Refresh(ctx context.Context, host source.RegistryHosts, refspec reference.Spec, desc ocispec.Descriptor) error {
func (tb *testBlobState) Refresh(ctx context.Context, hosts []docker.RegistryHost, refspec reference.Spec, desc ocispec.Descriptor) error {
return nil
}
func (tb *testBlobState) Close() error { return nil }
Expand Down
27 changes: 12 additions & 15 deletions fs/remote/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ import (
"sync"
"time"

"github.com/awslabs/soci-snapshotter/fs/source"
"github.com/containerd/containerd/reference"
"github.com/containerd/containerd/remotes/docker"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)

Expand All @@ -59,7 +59,7 @@ type Blob interface {
Size() int64
FetchedSize() int64
ReadAt(p []byte, offset int64, opts ...Option) (int, error)
Refresh(ctx context.Context, host source.RegistryHosts, refspec reference.Spec, desc ocispec.Descriptor) error
Refresh(ctx context.Context, hosts []docker.RegistryHost, refspec reference.Spec, desc ocispec.Descriptor) error
Close() error
}

Expand All @@ -71,7 +71,6 @@ type blob struct {
lastCheck time.Time
lastCheckMu sync.Mutex
checkInterval time.Duration
fetchTimeout time.Duration

fetchedRegionSet regionSet
fetchedRegionSetMu sync.Mutex
Expand All @@ -83,14 +82,13 @@ type blob struct {
}

func makeBlob(fetcher fetcher, size int64, lastCheck time.Time, checkInterval time.Duration,
r *Resolver, fetchTimeout time.Duration) *blob {
r *Resolver) *blob {
return &blob{
fetcher: fetcher,
size: size,
lastCheck: lastCheck,
checkInterval: checkInterval,
resolver: r,
fetchTimeout: fetchTimeout,
}
}

Expand All @@ -110,13 +108,16 @@ func (b *blob) isClosed() bool {
return closed
}

func (b *blob) Refresh(ctx context.Context, hosts source.RegistryHosts, refspec reference.Spec, desc ocispec.Descriptor) error {
func (b *blob) Refresh(ctx context.Context, hosts []docker.RegistryHost, refspec reference.Spec, desc ocispec.Descriptor) error {
if b.isClosed() {
return fmt.Errorf("blob is already closed")
}

// refresh the fetcher
f, newSize, err := b.resolver.resolveFetcher(ctx, hosts, refspec, desc)
f, newSize, err := b.resolver.resolveFetcher(ctx, &fetcherConfig{
hosts: hosts,
refspec: refspec,
desc: desc,
})
if err != nil {
return err
}
Expand Down Expand Up @@ -211,20 +212,16 @@ func (b *blob) ReadAt(p []byte, offset int64, opts ...Option) (int, error) {
return len(p), nil
}

// fetchRegion fetches content from remote blob.
// It must be called from within fetchRange and need to ensure that it is inside the singleflight `Do` operation.
// fetchRegion fetches content from remote blob. It must be called from within fetchRange
// and need to ensure that it is inside the singleflight `Do` operation.
func (b *blob) fetchRegion(reg region, w io.Writer, fetched bool, opts *options) error {
// Fetcher can be suddenly updated so we take and use the snapshot of it for
// consistency.
b.fetcherMu.Lock()
fr := b.fetcher
b.fetcherMu.Unlock()

fetchCtx, cancel := context.WithTimeout(context.Background(), b.fetchTimeout)
defer cancel()
if opts.ctx != nil {
fetchCtx = opts.ctx
}
fetchCtx := context.Background()

var req []region
req = append(req, reg)
Expand Down
Loading

0 comments on commit 59f5919

Please sign in to comment.