Skip to content

Commit

Permalink
fix(sync): also sync on demand digests, not only tags, closes #902 (#932
Browse files Browse the repository at this point in the history
)

Signed-off-by: Petu Eusebiu <[email protected]>
  • Loading branch information
eusebiu-constantin-petu-dbk authored Oct 27, 2022
1 parent c6ffbce commit 2d877aa
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 46 deletions.
70 changes: 39 additions & 31 deletions pkg/extensions/sync/on_demand.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ func (di *demandedImages) delete(key string) {
}

func OneImage(cfg Config, storeController storage.StoreController,
repo, tag string, isArtifact bool, log log.Logger,
repo, reference string, isArtifact bool, log log.Logger,
) error {
// guard against multiple parallel requests
demandedImage := fmt.Sprintf("%s:%s", repo, tag)
demandedImage := fmt.Sprintf("%s:%s", repo, reference)
// loadOrStore image-based channel
imageChannel, found := demandedImgs.loadOrStoreChan(demandedImage, make(chan error))
// if value found wait on channel receive or close
Expand All @@ -73,7 +73,7 @@ func OneImage(cfg Config, storeController storage.StoreController,
defer demandedImgs.delete(demandedImage)
defer close(imageChannel)

go syncOneImage(imageChannel, cfg, storeController, repo, tag, isArtifact, log)
go syncOneImage(imageChannel, cfg, storeController, repo, reference, isArtifact, log)

err, ok := <-imageChannel
if !ok {
Expand All @@ -84,7 +84,7 @@ func OneImage(cfg Config, storeController storage.StoreController,
}

func syncOneImage(imageChannel chan error, cfg Config, storeController storage.StoreController,
localRepo, tag string, isArtifact bool, log log.Logger,
localRepo, reference string, isArtifact bool, log log.Logger,
) {
var credentialsFile CredentialsFile

Expand Down Expand Up @@ -161,21 +161,21 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S
options := getCopyOptions(upstreamCtx, localCtx)

// demanded 'image' is a signature
if isCosignTag(tag) {
if isCosignTag(reference) {
// at tis point we should already have images synced, but not their signatures.
// is cosign signature
cosignManifest, err := sig.getCosignManifest(upstreamRepo, tag)
cosignManifest, err := sig.getCosignManifest(upstreamRepo, reference)
if err != nil {
log.Error().Str("errorType", TypeOf(err)).
Err(err).Msgf("couldn't get upstream image %s:%s:%s cosign manifest", upstreamURL, upstreamRepo, tag)
Err(err).Msgf("couldn't get upstream image %s:%s:%s cosign manifest", upstreamURL, upstreamRepo, reference)

continue
}

err = sig.syncCosignSignature(localRepo, upstreamRepo, tag, cosignManifest)
err = sig.syncCosignSignature(localRepo, upstreamRepo, reference, cosignManifest)
if err != nil {
log.Error().Str("errorType", TypeOf(err)).
Err(err).Msgf("couldn't copy upstream image cosign signature %s/%s:%s", upstreamURL, upstreamRepo, tag)
Err(err).Msgf("couldn't copy upstream image cosign signature %s/%s:%s", upstreamURL, upstreamRepo, reference)

continue
}
Expand All @@ -185,18 +185,18 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S
return
} else if isArtifact {
// is notary signature
refs, err := sig.getNotaryRefs(upstreamRepo, tag)
refs, err := sig.getNotaryRefs(upstreamRepo, reference)
if err != nil {
log.Error().Str("errorType", TypeOf(err)).
Err(err).Msgf("couldn't get upstream image %s/%s:%s notary references", upstreamURL, upstreamRepo, tag)
Err(err).Msgf("couldn't get upstream image %s/%s:%s notary references", upstreamURL, upstreamRepo, reference)

continue
}

err = sig.syncNotarySignature(localRepo, upstreamRepo, tag, refs)
err = sig.syncNotarySignature(localRepo, upstreamRepo, reference, refs)
if err != nil {
log.Error().Str("errorType", TypeOf(err)).
Err(err).Msgf("couldn't copy image signature %s/%s:%s", upstreamURL, upstreamRepo, tag)
Err(err).Msgf("couldn't copy image signature %s/%s:%s", upstreamURL, upstreamRepo, reference)

continue
}
Expand All @@ -214,13 +214,13 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S
copyOptions: options,
}

skipped, copyErr := syncRun(regCfg, localRepo, upstreamRepo, tag, syncContextUtils, sig, log)
skipped, copyErr := syncRun(regCfg, localRepo, upstreamRepo, reference, syncContextUtils, sig, log)
if skipped {
continue
}

// key used to check if we already have a go routine syncing this image
demandedImageRef := fmt.Sprintf("%s/%s:%s", upstreamAddr, upstreamRepo, tag)
demandedImageRef := fmt.Sprintf("%s/%s:%s", upstreamAddr, upstreamRepo, reference)

if copyErr != nil {
// don't retry in background if maxretry is 0
Expand Down Expand Up @@ -249,14 +249,18 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S
time.Sleep(retryOptions.Delay)

if err = retry.RetryIfNecessary(context.Background(), func() error {
_, err := syncRun(regCfg, localRepo, upstreamRepo, tag, syncContextUtils, sig, log)
_, err := syncRun(regCfg, localRepo, upstreamRepo, reference, syncContextUtils, sig, log)

return err
}, retryOptions); err != nil {
log.Error().Str("errorType", TypeOf(err)).
Err(err).Msgf("sync routine: error while copying image %s", demandedImageRef)
}
}()
} else {
imageChannel <- nil

return
}
}
}
Expand All @@ -265,24 +269,28 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S
}

func syncRun(regCfg RegistryConfig,
localRepo, upstreamRepo, tag string, utils syncContextUtils, sig *signaturesCopier,
localRepo, upstreamRepo, reference string, utils syncContextUtils, sig *signaturesCopier,
log log.Logger,
) (bool, error) {
upstreamImageRef, err := getImageRef(utils.upstreamAddr, upstreamRepo, tag)
upstreamImageDigest, refIsDigest := parseDigest(reference)

upstreamImageRef, err := getImageRef(utils.upstreamAddr, upstreamRepo, reference)
if err != nil {
log.Error().Str("errorType", TypeOf(err)).
Err(err).Msgf("error creating docker reference for repository %s/%s:%s",
utils.upstreamAddr, upstreamRepo, tag)
utils.upstreamAddr, upstreamRepo, reference)

return false, err
}

upstreamImageDigest, err := docker.GetDigest(context.Background(), utils.upstreamCtx, upstreamImageRef)
if err != nil {
log.Error().Str("errorType", TypeOf(err)).
Err(err).Msgf("couldn't get upstream image %s manifest", upstreamImageRef.DockerReference())
if !refIsDigest {
upstreamImageDigest, err = docker.GetDigest(context.Background(), utils.upstreamCtx, upstreamImageRef)
if err != nil {
log.Error().Str("errorType", TypeOf(err)).
Err(err).Msgf("couldn't get upstream image %s manifest", upstreamImageRef.DockerReference())

return false, err
return false, err
}
}

// get upstream signatures
Expand Down Expand Up @@ -316,11 +324,11 @@ func syncRun(regCfg RegistryConfig,
log.Error().Err(err).Msgf("couldn't get localCachePath for %s", localRepo)
}

localImageRef, err := getLocalImageRef(localCachePath, localRepo, tag)
localImageRef, err := getLocalImageRef(localCachePath, localRepo, reference)
if err != nil {
log.Error().Str("errorType", TypeOf(err)).
Err(err).Msgf("couldn't obtain a valid image reference for reference %s/%s:%s",
localCachePath, localRepo, tag)
localCachePath, localRepo, reference)

return false, err
}
Expand All @@ -338,32 +346,32 @@ func syncRun(regCfg RegistryConfig,
return false, err
}

err = pushSyncedLocalImage(localRepo, tag, localCachePath, imageStore, log)
err = pushSyncedLocalImage(localRepo, reference, localCachePath, imageStore, log)
if err != nil {
log.Error().Str("errorType", TypeOf(err)).
Err(err).Msgf("error while pushing synced cached image %s",
fmt.Sprintf("%s/%s:%s", localCachePath, localRepo, tag))
fmt.Sprintf("%s/%s:%s", localCachePath, localRepo, reference))

return false, err
}

err = sig.syncCosignSignature(localRepo, upstreamRepo, upstreamImageDigest.String(), cosignManifest)
if err != nil {
log.Error().Str("errorType", TypeOf(err)).
Err(err).Msgf("couldn't copy image cosign signature %s/%s:%s", utils.upstreamAddr, upstreamRepo, tag)
Err(err).Msgf("couldn't copy image cosign signature %s/%s:%s", utils.upstreamAddr, upstreamRepo, reference)

return false, err
}

err = sig.syncNotarySignature(localRepo, upstreamRepo, upstreamImageDigest.String(), refs)
if err != nil {
log.Error().Str("errorType", TypeOf(err)).
Err(err).Msgf("couldn't copy image notary signature %s/%s:%s", utils.upstreamAddr, upstreamRepo, tag)
Err(err).Msgf("couldn't copy image notary signature %s/%s:%s", utils.upstreamAddr, upstreamRepo, reference)

return false, err
}

log.Info().Msgf("successfully synced %s/%s:%s", utils.upstreamAddr, upstreamRepo, tag)
log.Info().Msgf("successfully synced %s/%s:%s", utils.upstreamAddr, upstreamRepo, reference)

return false, nil
}
1 change: 0 additions & 1 deletion pkg/extensions/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ func getCopyOptions(upstreamCtx, localCtx *types.SystemContext) copy.Options {
SourceCtx: upstreamCtx,
ReportWriter: io.Discard,
ForceManifestMIMEType: ispec.MediaTypeImageManifest, // force only oci manifest MIME type
PreserveDigests: true,
ImageListSelection: copy.CopyAllImages,
}

Expand Down
50 changes: 50 additions & 0 deletions pkg/extensions/sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2865,6 +2865,56 @@ func TestOnDemandRetryGoroutine(t *testing.T) {
})
}

func TestOnDemandWithDigest(t *testing.T) {
Convey("Verify ondemand sync retries in background on error", t, func() {
_, srcBaseURL, _, _, _ := startUpstreamServer(t, false, false)

regex := ".*"
semver := true
var tlsVerify bool

syncRegistryConfig := sync.RegistryConfig{
Content: []sync.Content{
{
Prefix: testImage,
Tags: &sync.Tags{
Regex: &regex,
Semver: &semver,
},
},
},
URLs: []string{srcBaseURL},
OnDemand: true,
TLSVerify: &tlsVerify,
CertDir: "",
}

defaultVal := true
syncConfig := &sync.Config{
Enable: &defaultVal,
Registries: []sync.RegistryConfig{syncRegistryConfig},
}

dctlr, destBaseURL, destDir, destClient := startDownstreamServer(t, false, syncConfig)
defer os.RemoveAll(destDir)

defer func() {
dctlr.Shutdown()
}()

// get manifest digest from source
resp, err := destClient.R().Get(srcBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag)
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, 200)

digest := godigest.FromBytes(resp.Body())

resp, err = destClient.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + digest.String())
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, 200)
})
}

func TestOnDemandRetryGoroutineErr(t *testing.T) {
Convey("Verify ondemand sync retries in background on error", t, func() {
regex := ".*"
Expand Down
55 changes: 41 additions & 14 deletions pkg/extensions/sync/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,28 +328,28 @@ func getHTTPClient(regCfg *RegistryConfig, upstreamURL string, credentials Crede
return client, registryURL, nil
}

func pushSyncedLocalImage(localRepo, tag, localCachePath string,
func pushSyncedLocalImage(localRepo, reference, localCachePath string,
imageStore storage.ImageStore, log log.Logger,
) error {
log.Info().Msgf("pushing synced local image %s/%s:%s to local registry", localCachePath, localRepo, tag)
log.Info().Msgf("pushing synced local image %s/%s:%s to local registry", localCachePath, localRepo, reference)

metrics := monitoring.NewMetricsServer(false, log)

cacheImageStore := local.NewImageStore(localCachePath, false,
storage.DefaultGCDelay, false, false, log, metrics, nil)

manifestContent, _, mediaType, err := cacheImageStore.GetImageManifest(localRepo, tag)
manifestContent, _, mediaType, err := cacheImageStore.GetImageManifest(localRepo, reference)
if err != nil {
log.Error().Str("errorType", TypeOf(err)).
Err(err).Str("dir", path.Join(cacheImageStore.RootDir(), localRepo)).
Msg("couldn't find index.json")
Msgf("couldn't find %s manifest", reference)

return err
}

// is image manifest
if mediaType == ispec.MediaTypeImageManifest {
if err := copyManifest(localRepo, manifestContent, tag, cacheImageStore, imageStore, log); err != nil {
if err := copyManifest(localRepo, manifestContent, reference, cacheImageStore, imageStore, log); err != nil {
if errors.Is(err, zerr.ErrImageLintAnnotations) {
log.Error().Str("errorType", TypeOf(err)).
Err(err).Msg("couldn't upload manifest because of missing annotations")
Expand Down Expand Up @@ -397,7 +397,7 @@ func pushSyncedLocalImage(localRepo, tag, localCachePath string,
}
}

_, err = imageStore.PutImageManifest(localRepo, tag, mediaType, manifestContent)
_, err = imageStore.PutImageManifest(localRepo, reference, mediaType, manifestContent)
if err != nil {
log.Error().Str("errorType", TypeOf(err)).
Err(err).Msg("couldn't upload manifest")
Expand Down Expand Up @@ -486,18 +486,28 @@ func StripRegistryTransport(url string) string {
}

// get an ImageReference given the registry, repo and tag.
func getImageRef(registryDomain, repo, tag string) (types.ImageReference, error) {
func getImageRef(registryDomain, repo, ref string) (types.ImageReference, error) {
repoRef, err := parseRepositoryReference(fmt.Sprintf("%s/%s", registryDomain, repo))
if err != nil {
return nil, err
}

taggedRepoRef, err := reference.WithTag(repoRef, tag)
if err != nil {
return nil, err
var namedRepoRef reference.Named

digest, ok := parseDigest(ref)
if ok {
namedRepoRef, err = reference.WithDigest(repoRef, digest)
if err != nil {
return nil, err
}
} else {
namedRepoRef, err = reference.WithTag(repoRef, ref)
if err != nil {
return nil, err
}
}

imageRef, err := docker.NewReference(taggedRepoRef)
imageRef, err := docker.NewReference(namedRepoRef)
if err != nil {
return nil, err
}
Expand All @@ -506,15 +516,20 @@ func getImageRef(registryDomain, repo, tag string) (types.ImageReference, error)
}

// get a local ImageReference used to temporary store one synced image.
func getLocalImageRef(localCachePath, repo, tag string) (types.ImageReference, error) {
func getLocalImageRef(localCachePath, repo, reference string) (types.ImageReference, error) {
if _, err := os.ReadDir(localCachePath); err != nil {
return nil, err
}

localRepo := path.Join(localCachePath, repo)
localTaggedRepo := fmt.Sprintf("%s:%s", localRepo, tag)

localImageRef, err := layout.ParseReference(localTaggedRepo)
_, refIsDigest := parseDigest(reference)

if !refIsDigest {
localRepo = fmt.Sprintf("%s:%s", localRepo, reference)
}

localImageRef, err := layout.ParseReference(localRepo)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -579,6 +594,18 @@ func canSkipImage(repo, tag string, digest godigest.Digest, imageStore storage.I
return true, nil
}

// parse a reference, return its digest and if it's valid.
func parseDigest(reference string) (godigest.Digest, bool) {
var ok bool

d, err := godigest.Parse(reference)
if err == nil {
ok = true
}

return d, ok
}

func manifestsEqual(manifest1, manifest2 ispec.Manifest) bool {
if manifest1.Config.Digest == manifest2.Config.Digest &&
manifest1.Config.MediaType == manifest2.Config.MediaType &&
Expand Down

0 comments on commit 2d877aa

Please sign in to comment.