From 3ba44210270b214b005b6f75785190119bd51678 Mon Sep 17 00:00:00 2001 From: Keith Zantow Date: Fri, 31 Jan 2025 14:06:53 -0500 Subject: [PATCH] feat: run file hash algorithms in parallel Signed-off-by: Keith Zantow --- cmd/syft/internal/commands/scan.go | 3 ++ go.mod | 1 + go.sum | 2 + internal/file/digest.go | 6 ++- internal/file/parallel_writer.go | 43 +++++++++++++++++++++ syft/file/cataloger/filedigest/cataloger.go | 6 +-- syft/pkg/cataloger/java/archive_parser.go | 6 +-- syft/source/filesource/file_source.go | 3 +- 8 files changed, 61 insertions(+), 9 deletions(-) create mode 100644 internal/file/parallel_writer.go diff --git a/cmd/syft/internal/commands/scan.go b/cmd/syft/internal/commands/scan.go index 08b8b185a8c..98d67ead917 100644 --- a/cmd/syft/internal/commands/scan.go +++ b/cmd/syft/internal/commands/scan.go @@ -15,6 +15,7 @@ import ( "github.com/anchore/clio" "github.com/anchore/fangs" "github.com/anchore/go-collections" + "github.com/anchore/go-sync" "github.com/anchore/stereoscope" "github.com/anchore/stereoscope/pkg/image" "github.com/anchore/syft/cmd/syft/internal/options" @@ -184,6 +185,8 @@ func runScan(ctx context.Context, id clio.Identification, opts *scanOptions, use } } + ctx = sync.SetContextExecutor(ctx, sync.NewExecutor(opts.Parallelism)) + src, err := getSource(ctx, &opts.Catalog, userInput, sources...) if err != nil { diff --git a/go.mod b/go.mod index 218a0ddb25e..0aa47c69ce3 100644 --- a/go.mod +++ b/go.mod @@ -90,6 +90,7 @@ require ( github.com/OneOfOne/xxhash v1.2.8 github.com/adrg/xdg v0.5.3 github.com/anchore/archiver/v3 v3.5.3-0.20241210171143-5b1d8d1c7c51 + github.com/anchore/go-sync v0.0.0-20241216143621-0b0fc28c752f github.com/hashicorp/hcl/v2 v2.23.0 github.com/magiconair/properties v1.8.9 golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 diff --git a/go.sum b/go.sum index c4df6466733..a15ec89b990 100644 --- a/go.sum +++ b/go.sum @@ -110,6 +110,8 @@ github.com/anchore/go-macholibre v0.0.0-20220308212642-53e6d0aaf6fb h1:iDMnx6LIj github.com/anchore/go-macholibre v0.0.0-20220308212642-53e6d0aaf6fb/go.mod h1:DmTY2Mfcv38hsHbG78xMiTDdxFtkHpgYNVDPsF2TgHk= github.com/anchore/go-struct-converter v0.0.0-20221118182256-c68fdcfa2092 h1:aM1rlcoLz8y5B2r4tTLMiVTrMtpfY0O8EScKJxaSaEc= github.com/anchore/go-struct-converter v0.0.0-20221118182256-c68fdcfa2092/go.mod h1:rYqSE9HbjzpHTI74vwPvae4ZVYZd1lue2ta6xHPdblA= +github.com/anchore/go-sync v0.0.0-20241216143621-0b0fc28c752f h1:0TPfHMmCRZedeu6qB33jxnbt0hX8waAzvVyPcClGiN8= +github.com/anchore/go-sync v0.0.0-20241216143621-0b0fc28c752f/go.mod h1:zrREDHPQOL+4BqMNkp529xdXenbTtafFhSclMdWSAzw= github.com/anchore/go-testutils v0.0.0-20200925183923-d5f45b0d3c04 h1:VzprUTpc0vW0nnNKJfJieyH/TZ9UYAnTZs5/gHTdAe8= github.com/anchore/go-testutils v0.0.0-20200925183923-d5f45b0d3c04/go.mod h1:6dK64g27Qi1qGQZ67gFmBFvEHScy0/C8qhQhNe5B5pQ= github.com/anchore/go-version v1.2.2-0.20200701162849-18adb9c92b9b h1:e1bmaoJfZVsCYMrIZBpFxwV26CbsuoEh5muXD5I1Ods= diff --git a/internal/file/digest.go b/internal/file/digest.go index 8c9932d9fe2..87c8c863afa 100644 --- a/internal/file/digest.go +++ b/internal/file/digest.go @@ -1,12 +1,14 @@ package file import ( + "context" "crypto" "fmt" "hash" "io" "strings" + "github.com/anchore/go-sync" "github.com/anchore/syft/syft/file" ) @@ -21,7 +23,7 @@ func supportedHashAlgorithms() []crypto.Hash { } } -func NewDigestsFromFile(closer io.ReadCloser, hashes []crypto.Hash) ([]file.Digest, error) { +func NewDigestsFromFile(ctx context.Context, closer io.ReadCloser, hashes []crypto.Hash) ([]file.Digest, error) { hashes = NormalizeHashes(hashes) // create a set of hasher objects tied together with a single writer to feed content into hashers := make([]hash.Hash, len(hashes)) @@ -31,7 +33,7 @@ func NewDigestsFromFile(closer io.ReadCloser, hashes []crypto.Hash) ([]file.Dige writers[idx] = hashers[idx] } - size, err := io.Copy(io.MultiWriter(writers...), closer) + size, err := io.Copy(newParallelWriter(sync.ContextExecutor(ctx), writers...), closer) if err != nil { return nil, err } diff --git a/internal/file/parallel_writer.go b/internal/file/parallel_writer.go new file mode 100644 index 00000000000..6beb9808b96 --- /dev/null +++ b/internal/file/parallel_writer.go @@ -0,0 +1,43 @@ +package file + +import ( + "errors" + "io" + "sync" + + gosync "github.com/anchore/go-sync" +) + +type parallelWriter struct { + executor gosync.Executor + writers []io.Writer +} + +func newParallelWriter(executor gosync.Executor, writers ...io.Writer) *parallelWriter { + return ¶llelWriter{ + executor: executor, + writers: writers, + } +} + +func (w *parallelWriter) Write(p []byte) (int, error) { + errs := gosync.List[error]{} + wg := sync.WaitGroup{} + wg.Add(len(w.writers)) + for _, writer := range w.writers { + w.executor.Execute(func() { + defer wg.Done() + _, err := writer.Write(p) + if err != nil { + errs.Add(err) + } + }) + } + wg.Wait() + if errs.Len() > 0 { + return 0, errors.Join(errs.Values()...) + } + return len(p), nil +} + +var _ io.Writer = (*parallelWriter)(nil) diff --git a/syft/file/cataloger/filedigest/cataloger.go b/syft/file/cataloger/filedigest/cataloger.go index f8aa9ace116..ffdabb6231c 100644 --- a/syft/file/cataloger/filedigest/cataloger.go +++ b/syft/file/cataloger/filedigest/cataloger.go @@ -50,7 +50,7 @@ func (i *Cataloger) Catalog(ctx context.Context, resolver file.Resolver, coordin prog := catalogingProgress(int64(len(locations))) for _, location := range locations { - result, err := i.catalogLocation(resolver, location) + result, err := i.catalogLocation(ctx, resolver, location) if errors.Is(err, ErrUndigestableFile) { continue @@ -83,7 +83,7 @@ func (i *Cataloger) Catalog(ctx context.Context, resolver file.Resolver, coordin return results, errs } -func (i *Cataloger) catalogLocation(resolver file.Resolver, location file.Location) ([]file.Digest, error) { +func (i *Cataloger) catalogLocation(ctx context.Context, resolver file.Resolver, location file.Location) ([]file.Digest, error) { meta, err := resolver.FileMetadataByLocation(location) if err != nil { return nil, err @@ -100,7 +100,7 @@ func (i *Cataloger) catalogLocation(resolver file.Resolver, location file.Locati } defer internal.CloseAndLogError(contentReader, location.AccessPath) - digests, err := intFile.NewDigestsFromFile(contentReader, i.hashes) + digests, err := intFile.NewDigestsFromFile(ctx, contentReader, i.hashes) if err != nil { return nil, internal.ErrPath{Context: "digests-cataloger", Path: location.RealPath, Err: err} } diff --git a/syft/pkg/cataloger/java/archive_parser.go b/syft/pkg/cataloger/java/archive_parser.go index e0ed965b212..7f9ccac6137 100644 --- a/syft/pkg/cataloger/java/archive_parser.go +++ b/syft/pkg/cataloger/java/archive_parser.go @@ -246,7 +246,7 @@ func (j *archiveParser) discoverMainPackage(ctx context.Context) (*pkg.Package, } // grab and assign digest for the entire archive - digests, err := getDigestsFromArchive(j.archivePath) + digests, err := getDigestsFromArchive(ctx, j.archivePath) if err != nil { return nil, err } @@ -472,7 +472,7 @@ func (j *archiveParser) discoverPkgsFromAllMavenFiles(ctx context.Context, paren return pkgs, nil } -func getDigestsFromArchive(archivePath string) ([]file.Digest, error) { +func getDigestsFromArchive(ctx context.Context, archivePath string) ([]file.Digest, error) { archiveCloser, err := os.Open(archivePath) if err != nil { return nil, fmt.Errorf("unable to open archive path (%s): %w", archivePath, err) @@ -480,7 +480,7 @@ func getDigestsFromArchive(archivePath string) ([]file.Digest, error) { defer internal.CloseAndLogError(archiveCloser, archivePath) // grab and assign digest for the entire archive - digests, err := intFile.NewDigestsFromFile(archiveCloser, javaArchiveHashes) + digests, err := intFile.NewDigestsFromFile(ctx, archiveCloser, javaArchiveHashes) if err != nil { log.Warnf("failed to create digest for file=%q: %+v", archivePath, err) } diff --git a/syft/source/filesource/file_source.go b/syft/source/filesource/file_source.go index caeec6e5545..a87ace55dd8 100644 --- a/syft/source/filesource/file_source.go +++ b/syft/source/filesource/file_source.go @@ -1,6 +1,7 @@ package filesource import ( + "context" "crypto" "fmt" "os" @@ -68,7 +69,7 @@ func New(cfg Config) (source.Source, error) { defer fh.Close() - digests, err = intFile.NewDigestsFromFile(fh, cfg.DigestAlgorithms) + digests, err = intFile.NewDigestsFromFile(context.TODO(), fh, cfg.DigestAlgorithms) if err != nil { return nil, fmt.Errorf("unable to calculate digests for file=%q: %w", cfg.Path, err) }