Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

add rate limit for local backend, parse limit args #1066

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion go.mod1
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,7 @@ require (
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0
)

replace cloud.google.com/go/storage => github.com/3pointer/google-cloud-go/storage v1.6.1-0.20210108125931-b59bfa0720b2
replace (
cloud.google.com/go/storage => github.com/3pointer/google-cloud-go/storage v1.6.1-0.20210108125931-b59bfa0720b2
github.com/pingcap/kvproto v0.0.0-20210308063835-39b884695fb8 => github.com/recall704/kvproto v0.0.0-20210414071537-7bdfcba5f5d5
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pingcap/kvproto#756 PR required

)
4 changes: 4 additions & 0 deletions pkg/storage/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@ import (

// DefineFlags adds flags to the flag set corresponding to all backend options.
func DefineFlags(flags *pflag.FlagSet) {
DefineLocalFlags(flags)
defineS3Flags(flags)
defineGCSFlags(flags)
}

// ParseFromFlags obtains the backend options from the flag set.
func (options *BackendOptions) ParseFromFlags(flags *pflag.FlagSet) error {
if err := options.Local.parseFromFlags(flags); err != nil {
return errors.Trace(err)
}
if err := options.S3.parseFromFlags(flags); err != nil {
return errors.Trace(err)
}
Expand Down
57 changes: 55 additions & 2 deletions pkg/storage/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,52 @@ import (
"os"
"path/filepath"

"github.com/fujiwara/shapeio"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/backup"
backuppb "github.com/pingcap/kvproto/pkg/backup"
"github.com/spf13/pflag"
)

const (
localDirPerm os.FileMode = 0o755
localFilePerm os.FileMode = 0o644
)

const (
localRateLimitOption = "local.ratelimit"
)

// LocalStorage represents local file system storage.
//
// export for using in tests.
type LocalStorage struct {
base string
base string
rateLimit uint64
}

// defineS3Flags defines the command line flags for S3BackendOptions.
func DefineLocalFlags(flags *pflag.FlagSet) {
flags.Uint64(localRateLimitOption, 0, "rate limit for dump/load data, unit is byte")
}

type LocalOptions struct {
RateLimit uint64 `json:"ratelimit"`
}

// Apply apply s3 options on backuppb.S3.
func (options *LocalOptions) Apply(local *backuppb.Local) error {
local.RateLimit = options.RateLimit
return nil
}

func (options *LocalOptions) parseFromFlags(flags *pflag.FlagSet) error {
var err error
options.RateLimit, err = flags.GetUint64(localRateLimitOption)
if err != nil {
return errors.Trace(err)
}
return nil
}

// WriteFile writes data to a file to storage.
Expand Down Expand Up @@ -97,7 +130,13 @@ func (l *LocalStorage) Create(ctx context.Context, name string) (ExternalFileWri
return nil, errors.Trace(err)
}
buf := bufio.NewWriter(file)
return newFlushStorageWriter(buf, buf, file), nil
if l.rateLimit > 0 {
writer := shapeio.NewWriterWithContext(buf, ctx)
writer.SetRateLimit(float64(l.rateLimit)) // Byte/sec
return newFlushStorageWriter(writer, buf, file), nil
} else {
return newFlushStorageWriter(buf, buf, file), nil
}
}

func pathExists(_path string) (bool, error) {
Expand Down Expand Up @@ -127,3 +166,17 @@ func NewLocalStorage(base string) (*LocalStorage, error) {
}
return &LocalStorage{base: base}, nil
}

func newLocalStorage(backend *backup.Local, opts *ExternalStorageOptions) (*LocalStorage, error) {
ok, err := pathExists(backend.Path)
if err != nil {
return nil, errors.Trace(err)
}
if !ok {
err := mkdirAll(backend.Path)
if err != nil {
return nil, errors.Trace(err)
}
}
return &LocalStorage{base: backend.Path, rateLimit: backend.RateLimit}, nil
}
19 changes: 17 additions & 2 deletions pkg/storage/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ import (
// BackendOptions further configures the storage backend not expressed by the
// storage URL.
type BackendOptions struct {
S3 S3BackendOptions `json:"s3" toml:"s3"`
GCS GCSBackendOptions `json:"gcs" toml:"gcs"`
Local LocalOptions `json:"local" toml:"local"`
S3 S3BackendOptions `json:"s3" toml:"s3"`
GCS GCSBackendOptions `json:"gcs" toml:"gcs"`
}

// ParseRawURL parse raw url to url object.
Expand Down Expand Up @@ -52,10 +53,24 @@ func ParseBackend(rawURL string, options *BackendOptions) (*backuppb.StorageBack
return nil, errors.Annotatef(berrors.ErrStorageInvalidConfig, "covert data-source-dir '%s' to absolute path failed", rawURL)
}
local := &backuppb.Local{Path: absPath}
if options == nil {
options = &BackendOptions{Local: LocalOptions{}}
}
ExtractQueryParameters(u, &options.Local)
if err := options.Local.Apply(local); err != nil {
return nil, errors.Trace(err)
}
return &backuppb.StorageBackend{Backend: &backuppb.StorageBackend_Local{Local: local}}, nil

case "local", "file":
local := &backuppb.Local{Path: u.Path}
if options == nil {
options = &BackendOptions{Local: LocalOptions{}}
}
ExtractQueryParameters(u, &options.Local)
if err := options.Local.Apply(local); err != nil {
return nil, errors.Trace(err)
}
return &backuppb.StorageBackend{Backend: &backuppb.StorageBackend_Local{Local: local}}, nil

case "noop":
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func New(ctx context.Context, backend *backuppb.StorageBackend, opts *ExternalSt
if opts.SkipCheckPath {
return &LocalStorage{base: backend.Local.Path}, nil
}
return NewLocalStorage(backend.Local.Path)
return newLocalStorage(backend.Local, opts)
case *backuppb.StorageBackend_S3:
if backend.S3 == nil {
return nil, errors.Annotate(berrors.ErrStorageInvalidConfig, "s3 config not found")
Expand Down