Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Lightning: Changed data-source-dir to use StorageBackend to prevent leaking secret access key to JSON output #1392

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/lightning/checkpoints/checkpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,7 @@ func (cpdb *FileCheckpointsDB) Initialize(ctx context.Context, cfg *config.Confi

cpdb.checkpoints.TaskCheckpoint = &checkpointspb.TaskCheckpointModel{
TaskId: cfg.TaskID,
SourceDir: cfg.Mydumper.SourceDir,
SourceDir: cfg.Mydumper.SourceDir.String(),
Backend: cfg.TikvImporter.Backend,
ImporterAddr: cfg.TikvImporter.Addr,
TidbHost: cfg.TiDB.Host,
Expand Down
2 changes: 1 addition & 1 deletion pkg/lightning/checkpoints/glue_checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (g GlueCheckpointsDB) Initialize(ctx context.Context, cfg *config.Config, d
defer dropPreparedStmt(s, stmtID)
_, err = s.ExecutePreparedStmt(c, stmtID, []types.Datum{
types.NewIntDatum(cfg.TaskID),
types.NewStringDatum(cfg.Mydumper.SourceDir),
types.NewStringDatum(cfg.Mydumper.SourceDir.String()),
types.NewStringDatum(cfg.TikvImporter.Backend),
types.NewStringDatum(cfg.TikvImporter.Addr),
types.NewStringDatum(cfg.TiDB.Host),
Expand Down
61 changes: 8 additions & 53 deletions pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"encoding/json"
"fmt"
"net"
"net/url"
"os"
"path/filepath"
"runtime"
Expand Down Expand Up @@ -248,7 +247,7 @@ type MydumperRuntime struct {
ReadBlockSize ByteSize `toml:"read-block-size" json:"read-block-size"`
BatchSize ByteSize `toml:"batch-size" json:"batch-size"`
BatchImportRatio float64 `toml:"batch-import-ratio" json:"batch-import-ratio"`
SourceDir string `toml:"data-source-dir" json:"data-source-dir"`
SourceDir SourceDir `toml:"data-source-dir" json:"data-source-dir"`
CharacterSet string `toml:"character-set" json:"character-set"`
CSV CSVConfig `toml:"csv" json:"csv"`
MaxRegionSize ByteSize `toml:"max-region-size" json:"max-region-size"`
Expand Down Expand Up @@ -551,13 +550,17 @@ func (cfg *Config) Adjust(ctx context.Context) error {
// adjust file routing
for _, rule := range cfg.Mydumper.FileRouters {
if filepath.IsAbs(rule.Path) {
relPath, err := filepath.Rel(cfg.Mydumper.SourceDir, rule.Path)
local := cfg.Mydumper.SourceDir.GetLocal()
if local == nil {
return errors.New("cannot use absolute file route path when data source dir is not `local://`")
}
relPath, err := filepath.Rel(local.Path, rule.Path)
if err != nil {
return errors.Trace(err)
}
// ".." means that this path is not in source dir, so we should return an error
if strings.HasPrefix(relPath, "..") {
return errors.Errorf("file route path '%s' is not in source dir '%s'", rule.Path, cfg.Mydumper.SourceDir)
return errors.Errorf("file route path '%s' is not in source dir '%s'", rule.Path, local.Path)
}
rule.Path = relPath
}
Expand Down Expand Up @@ -645,7 +648,7 @@ func (cfg *Config) Adjust(ctx context.Context) error {
}
cfg.AdjustMydumper()
cfg.AdjustCheckPoint()
return cfg.CheckAndAdjustFilePath()
return nil
}

func (cfg *Config) CheckAndAdjustForLocalBackend() error {
Expand Down Expand Up @@ -762,54 +765,6 @@ func (cfg *Config) CheckAndAdjustTiDBPort(ctx context.Context, mustHaveInternalC
return nil
}

func (cfg *Config) CheckAndAdjustFilePath() error {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this check-and-adjust does several things:

  1. if the data-source-dir is a path, we common.IsDirExists it. this is going to be replaced by the ListObjects permission check.
  2. turn the path into a URL. this comes for free by storage.ParseBackend.
  3. ban unsupported schemes. currently all schemes are supported (including GCS), so the check is useless now.

var u *url.URL

// An absolute Windows path like "C:\Users\XYZ" would be interpreted as
// an URL with scheme "C" and opaque data "\Users\XYZ".
// Therefore, we only perform URL parsing if we are sure the path is not
// an absolute Windows path.
// Here we use the `filepath.VolumeName` which can identify the "C:" part
// out of the path. On Linux this method always return an empty string.
// On Windows, the drive letter can only be single letters from "A:" to "Z:",
// so this won't mistake "S3:" as a Windows path.
if len(filepath.VolumeName(cfg.Mydumper.SourceDir)) == 0 {
var err error
u, err = url.Parse(cfg.Mydumper.SourceDir)
if err != nil {
return errors.Trace(err)
}
} else {
u = &url.URL{}
}

// convert path and relative path to a valid file url
if u.Scheme == "" {
if !common.IsDirExists(cfg.Mydumper.SourceDir) {
return errors.Errorf("%s: mydumper dir does not exist", cfg.Mydumper.SourceDir)
}
absPath, err := filepath.Abs(cfg.Mydumper.SourceDir)
if err != nil {
return errors.Annotatef(err, "covert data-source-dir '%s' to absolute path failed", cfg.Mydumper.SourceDir)
}
cfg.Mydumper.SourceDir = "file://" + filepath.ToSlash(absPath)
u.Path = absPath
u.Scheme = "file"
}

found := false
for _, t := range supportedStorageTypes {
if u.Scheme == t {
found = true
break
}
}
if !found {
return errors.Errorf("Unsupported data-source-dir url '%s'", cfg.Mydumper.SourceDir)
}
return nil
}

func (cfg *Config) AdjustCheckPoint() {
if len(cfg.Checkpoint.Schema) == 0 {
cfg.Checkpoint.Schema = "tidb_lightning_checkpoint"
Expand Down
34 changes: 28 additions & 6 deletions pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package config_test
import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"net"
Expand Down Expand Up @@ -64,7 +65,7 @@ func assignMinimalLegalValue(cfg *config.Config) {
cfg.TiDB.Port = 4567
cfg.TiDB.StatusPort = 8901
cfg.TiDB.PdAddr = "234.56.78.90:12345"
cfg.Mydumper.SourceDir = "file://."
cfg.Mydumper.SourceDir = config.NewSourceDirFromPath(".")
cfg.TikvImporter.Backend = config.BackendLocal
cfg.TikvImporter.SortedKVDir = "."
cfg.TikvImporter.DiskQuota = 1
Expand All @@ -79,7 +80,7 @@ func (s *configTestSuite) TestAdjustPdAddrAndPort(c *C) {
cfg := config.NewConfig()
cfg.TiDB.Host = host
cfg.TiDB.StatusPort = port
cfg.Mydumper.SourceDir = "."
cfg.Mydumper.SourceDir = config.NewSourceDirFromPath(".")
cfg.TikvImporter.Backend = config.BackendLocal
cfg.TikvImporter.SortedKVDir = "."

Expand All @@ -98,7 +99,7 @@ func (s *configTestSuite) TestAdjustPdAddrAndPortViaAdvertiseAddr(c *C) {
cfg := config.NewConfig()
cfg.TiDB.Host = host
cfg.TiDB.StatusPort = port
cfg.Mydumper.SourceDir = "."
cfg.Mydumper.SourceDir = config.NewSourceDirFromPath(".")
cfg.TikvImporter.Backend = config.BackendLocal
cfg.TikvImporter.SortedKVDir = "."

Expand Down Expand Up @@ -156,7 +157,7 @@ func (s *configTestSuite) TestAdjustFileRoutePath(c *C) {

ctx := context.Background()
tmpDir := c.MkDir()
cfg.Mydumper.SourceDir = tmpDir
cfg.Mydumper.SourceDir = config.NewSourceDirFromPath(tmpDir)
invalidPath := filepath.Join(tmpDir, "../test123/1.sql")
rule := &config.FileRouteRule{Path: invalidPath, Type: "sql", Schema: "test", Table: "tbl"}
cfg.Mydumper.FileRouters = []*config.FileRouteRule{rule}
Expand Down Expand Up @@ -438,7 +439,7 @@ func (s *configTestSuite) TestInvalidCSV(c *C) {
comment := Commentf("input = %s", tc.input)

cfg := config.NewConfig()
cfg.Mydumper.SourceDir = "file://."
cfg.Mydumper.SourceDir = config.NewSourceDirFromPath(".")
cfg.TiDB.Port = 4000
cfg.TiDB.PdAddr = "test.invalid:2379"
cfg.TikvImporter.Backend = config.BackendLocal
Expand Down Expand Up @@ -532,7 +533,7 @@ func (s *configTestSuite) TestLoadConfig(c *C) {
c.Assert(cfg.TiDB.User, Equals, "guest")
c.Assert(cfg.TiDB.Psw, Equals, "12345")
c.Assert(cfg.TiDB.PdAddr, Equals, "172.16.30.11:2379,172.16.30.12:2379")
c.Assert(cfg.Mydumper.SourceDir, Equals, path)
c.Assert(cfg.Mydumper.SourceDir.String(), Equals, "local://"+filepath.ToSlash(path))
c.Assert(cfg.TikvImporter.Backend, Equals, config.BackendLocal)
c.Assert(cfg.TikvImporter.SortedKVDir, Equals, ".")
c.Assert(cfg.PostRestore.Checksum, Equals, config.OpLevelOff)
Expand All @@ -554,6 +555,27 @@ func (s *configTestSuite) TestLoadConfig(c *C) {
c.Assert(result, Matches, `.*"pd-addr":"172.16.30.11:2379,172.16.30.12:2379".*`)
}

func (s *configTestSuite) TestSourceDirSecrets(c *C) {
cfg := &config.Config{}
err := cfg.LoadFromTOML([]byte(`
[mydumper]
data-source-dir = 's3://bucket/path?access-key=aaaaaaa&secret-access-key=bbbbbbb'
`))
c.Assert(err, IsNil)
c.Assert(cfg.Mydumper.SourceDir.String(), Equals, "s3://bucket/path")
s3 := cfg.Mydumper.SourceDir.GetS3()
c.Assert(s3, NotNil)
c.Assert(s3.Bucket, Equals, "bucket")
c.Assert(s3.Prefix, Equals, "path")
c.Assert(s3.AccessKey, Equals, "aaaaaaa")
c.Assert(s3.SecretAccessKey, Equals, "bbbbbbb")

// ensure the secrets are not leaked through the json serialization.
j, err := json.Marshal(cfg)
c.Assert(err, IsNil)
c.Assert(string(j), Matches, `.*"data-source-dir":"s3://bucket/path".*`)
}

func (s *configTestSuite) TestDefaultImporterBackendValue(c *C) {
cfg := config.NewConfig()
assignMinimalLegalValue(cfg)
Expand Down
9 changes: 7 additions & 2 deletions pkg/lightning/config/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/errors"

"github.com/pingcap/br/pkg/lightning/log"
"github.com/pingcap/br/pkg/storage"
"github.com/pingcap/br/pkg/version/build"
)

Expand All @@ -50,7 +51,7 @@ type GlobalTiDB struct {
}

type GlobalMydumper struct {
SourceDir string `toml:"data-source-dir" json:"data-source-dir"`
SourceDir SourceDir `toml:"data-source-dir" json:"data-source-dir"`
// Deprecated
NoSchema bool `toml:"no-schema" json:"no-schema"`
Filter []string `toml:"filter" json:"filter"`
Expand Down Expand Up @@ -225,7 +226,11 @@ func LoadGlobalConfig(args []string, extraFlags func(*flag.FlagSet)) (*GlobalCon
cfg.TiDB.PdAddr = *pdAddr
}
if *dataSrcPath != "" {
cfg.Mydumper.SourceDir = *dataSrcPath
var err error
cfg.Mydumper.SourceDir.StorageBackend, err = storage.ParseBackend(*dataSrcPath, nil)
if err != nil {
return nil, err
}
}
if *importerAddr != "" {
cfg.TikvImporter.Addr = *importerAddr
Expand Down
55 changes: 55 additions & 0 deletions pkg/lightning/config/sourcedir.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2021 PingCAP, Inc. Licensed under Apache-2.0.

package config

import (
"context"
"encoding/json"

"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/backup"

"github.com/pingcap/br/pkg/storage"
)

// SourceDir is the parsed data source directory. It hides the "access-key" and
// "secret-access-key" when dumped as JSON.
type SourceDir struct {
*backuppb.StorageBackend
}

// String implements fmt.Stringer
func (sd SourceDir) String() string {
// need the intermediate variable to avoid "cannot call pointer method" compiler error.
u := storage.FormatBackendURL(sd.StorageBackend)
return u.String()
}

func (sd *SourceDir) UnmarshalText(text []byte) error {
var err error
sd.StorageBackend, err = storage.ParseBackend(string(text), nil)
return errors.Trace(err)
}

func (sd SourceDir) MarshalJSON() ([]byte, error) {
return json.Marshal(sd.String())
}

// NewStorage creates a new external storage interface from the storage backend
// URL. It is basically a wrapper around `storage.New()` function.
func (sd SourceDir) NewStorage(ctx context.Context, checkPermissions []storage.Permission) (storage.ExternalStorage, error) {
return storage.New(
ctx, sd.StorageBackend,
&storage.ExternalStorageOptions{CheckPermissions: checkPermissions},
)
}

// NewSourceDirFromPath creates a SourceDir from a file path.
// This should only be used in test.
func NewSourceDirFromPath(path string) SourceDir {
return SourceDir{
StorageBackend: &backuppb.StorageBackend{
Backend: &backuppb.StorageBackend_Local{Local: &backuppb.Local{Path: path}},
},
}
}
18 changes: 9 additions & 9 deletions pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,15 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue.
}
}()

storagePermissions := []storage.Permission{storage.ListObjects, storage.GetObject}
if !taskCfg.App.CheckRequirements {
storagePermissions = nil
}
s, err := taskCfg.Mydumper.SourceDir.NewStorage(ctx, storagePermissions)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this replaces the StoragePermission pre-check

if err != nil {
return errors.Annotatef(err, "cannot open storage at '%s'", taskCfg.Mydumper.SourceDir)
}

// initiation of default glue should be after RegisterMySQL, which is ready to be called after taskCfg.Adjust
// and also put it here could avoid injecting another two SkipRunTask failpoint to caller
if g == nil {
Expand All @@ -269,15 +278,6 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue.
g = glue.NewExternalTiDBGlue(db, taskCfg.TiDB.SQLMode)
}

u, err := storage.ParseBackend(taskCfg.Mydumper.SourceDir, nil)
if err != nil {
return errors.Annotate(err, "parse backend failed")
}
s, err := storage.New(ctx, u, &storage.ExternalStorageOptions{})
if err != nil {
return errors.Annotate(err, "create storage failed")
}

loadTask := log.L().Begin(zap.InfoLevel, "load data source")
var mdl *mydump.MDLoader
mdl, err = mydump.NewMyDumpLoaderWithStore(ctx, taskCfg, s)
Expand Down
10 changes: 5 additions & 5 deletions pkg/lightning/lightning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,22 @@ func (s *lightningSuite) TestRun(c *C) {
globalConfig.TiDB.Host = "test.invalid"
globalConfig.TiDB.Port = 4000
globalConfig.TiDB.PdAddr = "test.invalid:2379"
globalConfig.Mydumper.SourceDir = "not-exists"
globalConfig.Mydumper.SourceDir = config.NewSourceDirFromPath("not-exists")
globalConfig.TikvImporter.Backend = config.BackendLocal
globalConfig.TikvImporter.SortedKVDir = c.MkDir()
lightning := New(globalConfig)
cfg := config.NewConfig()
err := cfg.LoadFromGlobal(globalConfig)
c.Assert(err, IsNil)
err = lightning.RunOnce(context.Background(), cfg, nil)
c.Assert(err, ErrorMatches, ".*mydumper dir does not exist")
c.Assert(err, ErrorMatches, ".*(no such file or directory|The system cannot find the file specified).*")

path, _ := filepath.Abs(".")
ctx := context.Background()
invalidGlue := glue.NewExternalTiDBGlue(nil, 0)
err = lightning.run(ctx, &config.Config{
Mydumper: config.MydumperRuntime{
SourceDir: "file://" + filepath.ToSlash(path),
SourceDir: config.NewSourceDirFromPath(path),
Filter: []string{"*.*"},
DefaultFileRules: true,
},
Expand All @@ -90,7 +90,7 @@ func (s *lightningSuite) TestRun(c *C) {

err = lightning.run(ctx, &config.Config{
Mydumper: config.MydumperRuntime{
SourceDir: ".",
SourceDir: config.NewSourceDirFromPath("."),
Filter: []string{"*.*"},
},
Checkpoint: config.Checkpoint{
Expand All @@ -116,7 +116,7 @@ func (s *lightningServerSuite) SetUpTest(c *C) {
cfg.TiDB.PdAddr = "test.invalid:2379"
cfg.App.ServerMode = true
cfg.App.StatusAddr = "127.0.0.1:0"
cfg.Mydumper.SourceDir = "file://."
cfg.Mydumper.SourceDir = config.NewSourceDirFromPath(".")
cfg.TikvImporter.Backend = config.BackendLocal
cfg.TikvImporter.SortedKVDir = c.MkDir()

Expand Down
Loading