diff --git a/go.mod1 b/go.mod1 index 6fb40587e..cf1a454ce 100644 --- a/go.mod1 +++ b/go.mod1 @@ -54,4 +54,7 @@ require ( sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0 ) -replace cloud.google.com/go/storage => github.com/3pointer/google-cloud-go/storage v1.6.1-0.20210108125931-b59bfa0720b2 +replace ( + cloud.google.com/go/storage => github.com/3pointer/google-cloud-go/storage v1.6.1-0.20210108125931-b59bfa0720b2 + github.com/pingcap/kvproto v0.0.0-20210308063835-39b884695fb8 => github.com/recall704/kvproto v0.0.0-20210414071537-7bdfcba5f5d5 +) diff --git a/pkg/storage/flags.go b/pkg/storage/flags.go index 4dc54d31f..d24bb2376 100644 --- a/pkg/storage/flags.go +++ b/pkg/storage/flags.go @@ -9,12 +9,16 @@ import ( // DefineFlags adds flags to the flag set corresponding to all backend options. func DefineFlags(flags *pflag.FlagSet) { + DefineLocalFlags(flags) defineS3Flags(flags) defineGCSFlags(flags) } // ParseFromFlags obtains the backend options from the flag set. func (options *BackendOptions) ParseFromFlags(flags *pflag.FlagSet) error { + if err := options.Local.parseFromFlags(flags); err != nil { + return errors.Trace(err) + } if err := options.S3.parseFromFlags(flags); err != nil { return errors.Trace(err) } diff --git a/pkg/storage/local.go b/pkg/storage/local.go index 5e1f233b8..b61eeaab4 100644 --- a/pkg/storage/local.go +++ b/pkg/storage/local.go @@ -9,7 +9,11 @@ import ( "os" "path/filepath" + "github.com/fujiwara/shapeio" "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/backup" + backuppb "github.com/pingcap/kvproto/pkg/backup" + "github.com/spf13/pflag" ) const ( @@ -17,11 +21,40 @@ const ( localFilePerm os.FileMode = 0o644 ) +const ( + localRateLimitOption = "local.ratelimit" +) + // LocalStorage represents local file system storage. // // export for using in tests. type LocalStorage struct { - base string + base string + rateLimit uint64 +} + +// defineS3Flags defines the command line flags for S3BackendOptions. +func DefineLocalFlags(flags *pflag.FlagSet) { + flags.Uint64(localRateLimitOption, 0, "rate limit for dump/load data, unit is byte") +} + +type LocalOptions struct { + RateLimit uint64 `json:"ratelimit"` +} + +// Apply apply s3 options on backuppb.S3. +func (options *LocalOptions) Apply(local *backuppb.Local) error { + local.RateLimit = options.RateLimit + return nil +} + +func (options *LocalOptions) parseFromFlags(flags *pflag.FlagSet) error { + var err error + options.RateLimit, err = flags.GetUint64(localRateLimitOption) + if err != nil { + return errors.Trace(err) + } + return nil } // WriteFile writes data to a file to storage. @@ -97,7 +130,13 @@ func (l *LocalStorage) Create(ctx context.Context, name string) (ExternalFileWri return nil, errors.Trace(err) } buf := bufio.NewWriter(file) - return newFlushStorageWriter(buf, buf, file), nil + if l.rateLimit > 0 { + writer := shapeio.NewWriterWithContext(buf, ctx) + writer.SetRateLimit(float64(l.rateLimit)) // Byte/sec + return newFlushStorageWriter(writer, buf, file), nil + } else { + return newFlushStorageWriter(buf, buf, file), nil + } } func pathExists(_path string) (bool, error) { @@ -127,3 +166,17 @@ func NewLocalStorage(base string) (*LocalStorage, error) { } return &LocalStorage{base: base}, nil } + +func newLocalStorage(backend *backup.Local, opts *ExternalStorageOptions) (*LocalStorage, error) { + ok, err := pathExists(backend.Path) + if err != nil { + return nil, errors.Trace(err) + } + if !ok { + err := mkdirAll(backend.Path) + if err != nil { + return nil, errors.Trace(err) + } + } + return &LocalStorage{base: backend.Path, rateLimit: backend.RateLimit}, nil +} diff --git a/pkg/storage/parse.go b/pkg/storage/parse.go index 6447d7272..fda863ff9 100644 --- a/pkg/storage/parse.go +++ b/pkg/storage/parse.go @@ -18,8 +18,9 @@ import ( // BackendOptions further configures the storage backend not expressed by the // storage URL. type BackendOptions struct { - S3 S3BackendOptions `json:"s3" toml:"s3"` - GCS GCSBackendOptions `json:"gcs" toml:"gcs"` + Local LocalOptions `json:"local" toml:"local"` + S3 S3BackendOptions `json:"s3" toml:"s3"` + GCS GCSBackendOptions `json:"gcs" toml:"gcs"` } // ParseRawURL parse raw url to url object. @@ -52,10 +53,24 @@ func ParseBackend(rawURL string, options *BackendOptions) (*backuppb.StorageBack return nil, errors.Annotatef(berrors.ErrStorageInvalidConfig, "covert data-source-dir '%s' to absolute path failed", rawURL) } local := &backuppb.Local{Path: absPath} + if options == nil { + options = &BackendOptions{Local: LocalOptions{}} + } + ExtractQueryParameters(u, &options.Local) + if err := options.Local.Apply(local); err != nil { + return nil, errors.Trace(err) + } return &backuppb.StorageBackend{Backend: &backuppb.StorageBackend_Local{Local: local}}, nil case "local", "file": local := &backuppb.Local{Path: u.Path} + if options == nil { + options = &BackendOptions{Local: LocalOptions{}} + } + ExtractQueryParameters(u, &options.Local) + if err := options.Local.Apply(local); err != nil { + return nil, errors.Trace(err) + } return &backuppb.StorageBackend{Backend: &backuppb.StorageBackend_Local{Local: local}}, nil case "noop": diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index da8fdf5ee..5f249eca6 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -140,7 +140,7 @@ func New(ctx context.Context, backend *backuppb.StorageBackend, opts *ExternalSt if opts.SkipCheckPath { return &LocalStorage{base: backend.Local.Path}, nil } - return NewLocalStorage(backend.Local.Path) + return newLocalStorage(backend.Local, opts) case *backuppb.StorageBackend_S3: if backend.S3 == nil { return nil, errors.Annotate(berrors.ErrStorageInvalidConfig, "s3 config not found")