From 9e2eb7f88ef04fa04e6fe9acbeb4945efb532ecb Mon Sep 17 00:00:00 2001 From: Vladimir Buyanov Date: Thu, 25 Jan 2024 15:52:17 +0200 Subject: [PATCH] Fix few bugs --- README.md | 1 + cli/cli.go | 1 + cli/setup.go | 4 ++-- pipeline/collection/misc.go | 10 ++++++---- storage/s3/opts.go | 11 +++++++++++ storage/s3/s3.go | 38 +++++++++++++++++++------------------ storage/utils.go | 11 +++++++++++ 7 files changed, 52 insertions(+), 24 deletions(-) create mode 100644 storage/s3/opts.go diff --git a/README.md b/README.md index d232670..b8f8a6f 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ With 128 workers we get avg sync speed around 2k obj/sec (small objects 1-20 kb) * Each object is loaded into RAM. So you need ` * ` RAM. If you don't have enough RAM, you can use swap. A large (32-64 Gb) swap on SSD does not affect the tool performance. This happened because the tool was designed to synchronize billions of small files and optimized for this workload. + To avoid this you can use streaming storage drivers (now available only for S3 and FS). It's uses less RAM, but slower on small objects. ## Usage ``` diff --git a/cli/cli.go b/cli/cli.go index b164801..8c4cd0b 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -97,6 +97,7 @@ type args struct { DisableHTTP2 bool `arg:"--disable-http2" help:"Disable HTTP2 for http client"` ListBuffer uint `arg:"--list-buffer" help:"Size of list buffer" default:"1000"` SkipSSLVerify bool `arg:"--skip-ssl-verify" help:"Disable SSL verification for S3"` + ServerGzip bool `arg:"--server-gzip" help:"Workaround for S3 servers with enabled gzip compression for all files."` Profiler bool `arg:"--profiler" help:"Enable profiler on :8080"` // Rate Limit RateLimitObjPerSec uint `arg:"--ratelimit-objects" help:"Rate limit objects per second"` diff --git a/cli/setup.go b/cli/setup.go index f159eac..d667d28 100644 --- a/cli/setup.go +++ b/cli/setup.go @@ -21,7 +21,7 @@ func setupStorages(ctx context.Context, syncGroup *pipeline.Group, cli *argsPars switch cli.Source.Type { case storage.TypeS3: sourceStorage = s3.NewS3Storage(cli.SourceNoSign, cli.SourceKey, cli.SourceSecret, cli.SourceToken, cli.SourceRegion, cli.SourceEndpoint, - cli.Source.Bucket, cli.Source.Path, cli.S3KeysPerReq, cli.S3Retry, cli.S3RetryInterval, cli.SkipSSLVerify, + cli.Source.Bucket, cli.Source.Path, cli.S3KeysPerReq, cli.S3Retry, cli.S3RetryInterval, cli.SkipSSLVerify, cli.ServerGzip, ) case storage.TypeS3Stream: sourceStorage = s3stream.NewS3StreamStorage(cli.SourceNoSign, cli.SourceKey, cli.SourceSecret, cli.SourceToken, cli.SourceRegion, cli.SourceEndpoint, @@ -39,7 +39,7 @@ func setupStorages(ctx context.Context, syncGroup *pipeline.Group, cli *argsPars switch cli.Target.Type { case storage.TypeS3: targetStorage = s3.NewS3Storage(cli.TargetNoSign, cli.TargetKey, cli.TargetSecret, cli.TargetToken, cli.TargetRegion, cli.TargetEndpoint, - cli.Target.Bucket, cli.Target.Path, cli.S3KeysPerReq, cli.S3Retry, cli.S3RetryInterval, cli.SkipSSLVerify, + cli.Target.Bucket, cli.Target.Path, cli.S3KeysPerReq, cli.S3Retry, cli.S3RetryInterval, cli.SkipSSLVerify, cli.ServerGzip, ) case storage.TypeS3Stream: targetStorage = s3stream.NewS3StreamStorage(cli.TargetNoSign, cli.TargetKey, cli.TargetSecret, cli.TargetToken, cli.TargetRegion, cli.TargetEndpoint, diff --git a/pipeline/collection/misc.go b/pipeline/collection/misc.go index 46b3de8..318f849 100644 --- a/pipeline/collection/misc.go +++ b/pipeline/collection/misc.go @@ -2,9 +2,10 @@ package collection import ( "github.com/larrabee/ratelimit" + "github.com/sirupsen/logrus" + "github.com/larrabee/s3sync/pipeline" "github.com/larrabee/s3sync/storage" - "github.com/sirupsen/logrus" ) // Terminator like a /dev/null @@ -27,9 +28,10 @@ var Logger pipeline.StepFn = func(group *pipeline.Group, stepNum int, input <-ch for obj := range input { if ok { cfg.WithFields(logrus.Fields{ - "key": *obj.Key, - "size": *obj.ContentLength, - "Content-Type": *obj.ContentType, + "key": storage.ToValue(obj.Key), + "size": storage.ToValue(obj.ContentLength), + "Content-Type": storage.ToValue(obj.ContentType), + "Content-Encoding": storage.ToValue(obj.ContentEncoding), }).Infof("Sync file") output <- obj } diff --git a/storage/s3/opts.go b/storage/s3/opts.go new file mode 100644 index 0000000..df49d91 --- /dev/null +++ b/storage/s3/opts.go @@ -0,0 +1,11 @@ +package s3 + +import ( + "github.com/aws/aws-sdk-go/aws/request" +) + +func withAcceptEncoding(e string) request.Option { + return func(r *request.Request) { + r.HTTPRequest.Header.Add("Accept-Encoding", e) + } +} diff --git a/storage/s3/s3.go b/storage/s3/s3.go index a5a798f..83a00da 100644 --- a/storage/s3/s3.go +++ b/storage/s3/s3.go @@ -5,13 +5,14 @@ import ( "context" "crypto/tls" "errors" - "github.com/aws/aws-sdk-go/aws/request" "io" "net/http" "net/url" "strings" "time" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/defaults" @@ -34,12 +35,13 @@ type S3Storage struct { ctx context.Context listMarker *string rlBucket ratelimit.Bucket + serverGzip bool } // NewS3Storage return new configured S3 storage. // // You should always create new storage with this constructor. -func NewS3Storage(awsNoSign bool, awsAccessKey, awsSecretKey, awsToken, awsRegion, endpoint, bucketName, prefix string, keysPerReq int64, retryCnt uint, retryDelay time.Duration, skipSSLVerify bool) *S3Storage { +func NewS3Storage(awsNoSign bool, awsAccessKey, awsSecretKey, awsToken, awsRegion, endpoint, bucketName, prefix string, keysPerReq int64, retryCnt uint, retryDelay time.Duration, skipSSLVerify bool, serverGzip bool) *S3Storage { sess := session.Must(session.NewSessionWithOptions(session.Options{ SharedConfigState: session.SharedConfigEnable, })) @@ -83,6 +85,7 @@ func NewS3Storage(awsNoSign bool, awsAccessKey, awsSecretKey, awsToken, awsRegio retryInterval: retryDelay, ctx: context.TODO(), rlBucket: ratelimit.NewFakeBucket(), + serverGzip: serverGzip, } return &st @@ -105,7 +108,7 @@ func (st *S3Storage) WithRateLimit(limit int) error { // List S3 bucket and send founded objects to chan. func (st *S3Storage) List(output chan<- *storage.Object) error { - listObjectsFn := func(p *s3.ListObjectsOutput, lastPage bool) bool { + listObjectsFn := func(p *s3.ListObjectsV2Output, lastPage bool) bool { for _, o := range p.Contents { key, _ := url.QueryUnescape(aws.StringValue(o.Key)) key = strings.Replace(key, st.prefix, "", 1) @@ -117,19 +120,19 @@ func (st *S3Storage) List(output chan<- *storage.Object) error { IsLatest: aws.Bool(true), } } - st.listMarker = p.Marker + st.listMarker = p.NextContinuationToken return !lastPage // continue paging } - input := &s3.ListObjectsInput{ - Bucket: st.awsBucket, - Prefix: aws.String(st.prefix), - MaxKeys: aws.Int64(st.keysPerReq), - EncodingType: aws.String(s3.EncodingTypeUrl), - Marker: st.listMarker, + input := &s3.ListObjectsV2Input{ + Bucket: st.awsBucket, + Prefix: aws.String(st.prefix), + MaxKeys: aws.Int64(st.keysPerReq), + EncodingType: aws.String(s3.EncodingTypeUrl), + ContinuationToken: st.listMarker, } - if err := st.awsSvc.ListObjectsPagesWithContext(st.ctx, input, listObjectsFn); err != nil { + if err := st.awsSvc.ListObjectsV2PagesWithContext(st.ctx, input, listObjectsFn); err != nil { return err } storage.Log.Debugf("Listing bucket finished") @@ -191,12 +194,6 @@ func (st *S3Storage) PutObject(obj *storage.Object) error { return nil } -func withAcceptEncoding(e string) request.Option { - return func(r *request.Request) { - r.HTTPRequest.Header.Add("Accept-Encoding", e) - } -} - // GetObjectContent read object content and metadata from S3. func (st *S3Storage) GetObjectContent(obj *storage.Object) error { input := &s3.GetObjectInput{ @@ -205,7 +202,12 @@ func (st *S3Storage) GetObjectContent(obj *storage.Object) error { VersionId: obj.VersionId, } - result, err := st.awsSvc.GetObjectWithContext(st.ctx, input, withAcceptEncoding("gzip")) + opts := make([]request.Option, 0, 1) + if !st.serverGzip { + opts = append(opts, withAcceptEncoding("gzip")) + } + + result, err := st.awsSvc.GetObjectWithContext(st.ctx, input, opts...) if err != nil { return err } diff --git a/storage/utils.go b/storage/utils.go index b02cff5..612cc1c 100644 --- a/storage/utils.go +++ b/storage/utils.go @@ -58,3 +58,14 @@ func GetInsecureRandString(n int) string { return sb.String() } + +func ToPtr[K any](val K) *K { + return &val +} + +func ToValue[K any](val *K) K { + if val == nil { + return *new(K) + } + return *val +}