Skip to content

Commit

Permalink
server: when init data dir, skip if get changefeed info failed. (#2778)…
Browse files Browse the repository at this point in the history
… (#2787)

* This is an automated cherry-pick of #2778

Signed-off-by: ti-chi-bot <[email protected]>

* fix conflicts

Signed-off-by: Neil Shen <[email protected]>

Co-authored-by: Ling Jin <[email protected]>
Co-authored-by: Neil Shen <[email protected]>
  • Loading branch information
3 people authored Sep 13, 2021
1 parent c4e9321 commit 11e574c
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 37 deletions.
18 changes: 18 additions & 0 deletions cdc/kv/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,24 @@ func (c CDCEtcdClient) GetChangeFeeds(ctx context.Context) (int64, map[string]*m
return revision, details, nil
}

// GetAllChangeFeedInfo queries all changefeed information
func (c CDCEtcdClient) GetAllChangeFeedInfo(ctx context.Context) (map[string]*model.ChangeFeedInfo, error) {
_, details, err := c.GetChangeFeeds(ctx)
if err != nil {
return nil, errors.Trace(err)
}
allFeedInfo := make(map[string]*model.ChangeFeedInfo, len(details))
for id, rawDetail := range details {
info := &model.ChangeFeedInfo{}
if err := info.Unmarshal(rawDetail.Value); err != nil {
return nil, errors.Trace(err)
}
allFeedInfo[id] = info
}

return allFeedInfo, nil
}

// GetChangeFeedInfo queries the config of a given changefeed
func (c CDCEtcdClient) GetChangeFeedInfo(ctx context.Context, id string) (*model.ChangeFeedInfo, error) {
key := GetEtcdKeyChangeFeedInfo(id)
Expand Down
40 changes: 40 additions & 0 deletions cdc/kv/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,46 @@ func (s *etcdSuite) TestOpChangeFeedDetail(c *check.C) {
c.Assert(cerror.ErrChangeFeedNotExists.Equal(err), check.IsTrue)
}

func (s etcdSuite) TestGetAllChangeFeedInfo(c *check.C) {
defer testleak.AfterTest(c)()
defer s.TearDownTest(c)

ctx := context.Background()
infos := []struct {
id string
info *model.ChangeFeedInfo
}{
{
id: "a",
info: &model.ChangeFeedInfo{
SinkURI: "root@tcp(127.0.0.1:3306)/mysql",
SortDir: "/old-version/sorter",
},
},
{
id: "b",
info: &model.ChangeFeedInfo{
SinkURI: "root@tcp(127.0.0.1:4000)/mysql",
},
},
}

for _, item := range infos {
err := s.client.SaveChangeFeedInfo(ctx, item.info, item.id)
c.Assert(err, check.IsNil)
}

allChangFeedInfo, err := s.client.GetAllChangeFeedInfo(ctx)
c.Assert(err, check.IsNil)

for _, item := range infos {
obtained, found := allChangFeedInfo[item.id]
c.Assert(found, check.IsTrue)
c.Assert(item.info.SinkURI, check.Equals, obtained.SinkURI)
c.Assert(item.info.SortDir, check.Equals, obtained.SortDir)
}
}

func (s *etcdSuite) TestRemoveAllTaskXXX(c *check.C) {
defer testleak.AfterTest(c)()
defer s.TearDownTest(c)
Expand Down
10 changes: 3 additions & 7 deletions cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,17 +427,13 @@ func (s *Server) setUpDataDir(ctx context.Context) error {
}

// data-dir will be decide by exist changefeed for backward compatibility
allStatus, err := cli.GetAllChangeFeedStatus(ctx)
allInfo, err := cli.GetAllChangeFeedInfo(ctx)
if err != nil {
return errors.Trace(err)
}

candidates := make([]string, 0, len(allStatus))
for id := range allStatus {
info, err := cli.GetChangeFeedInfo(ctx, id)
if err != nil {
return errors.Trace(err)
}
candidates := make([]string, 0, len(allInfo))
for _, info := range allInfo {
if info.SortDir != "" {
candidates = append(candidates, info.SortDir)
}
Expand Down
81 changes: 51 additions & 30 deletions cdc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,19 @@ import (
"time"

"github.com/pingcap/check"
"github.com/pingcap/ticdc/cdc/kv"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/etcd"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/ticdc/pkg/util/testleak"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"golang.org/x/sync/errgroup"
)

type serverSuite struct {
server *Server
e *embed.Etcd
clientURL *url.URL
ctx context.Context
Expand All @@ -37,15 +41,35 @@ type serverSuite struct {
}

func (s *serverSuite) SetUpTest(c *check.C) {
dir := c.MkDir()
var err error
dir := c.MkDir()
s.clientURL, s.e, err = etcd.SetupEmbedEtcd(dir)
c.Assert(err, check.IsNil)

pdEndpoints := []string{
"http://" + s.clientURL.Host,
"http://invalid-pd-host:2379",
}
server, err := NewServer(pdEndpoints)
c.Assert(err, check.IsNil)
c.Assert(server, check.NotNil)
s.server = server

s.ctx, s.cancel = context.WithCancel(context.Background())
client, err := clientv3.New(clientv3.Config{
Endpoints: s.server.pdEndpoints,
Context: s.ctx,
DialTimeout: 5 * time.Second,
})
c.Assert(err, check.IsNil)
etcdClient := kv.NewCDCEtcdClient(s.ctx, client)
s.server.etcdClient = &etcdClient

s.errg = util.HandleErrWithErrGroup(s.ctx, s.e.Err(), func(e error) { c.Log(e) })
}

func (s *serverSuite) TearDownTest(c *check.C) {
s.server.Close()
s.e.Close()
s.cancel()
err := s.errg.Wait()
Expand All @@ -60,52 +84,49 @@ func (s *serverSuite) TestEtcdHealthChecker(c *check.C) {
defer testleak.AfterTest(c)()
defer s.TearDownTest(c)

ctx, cancel := context.WithCancel(context.Background())
pdEndpoints := []string{
"http://" + s.clientURL.Host,
"http://invalid-pd-host:2379",
}
server, err := NewServer(pdEndpoints)
c.Assert(err, check.IsNil)
c.Assert(server, check.NotNil)

s.errg.Go(func() error {
err := server.etcdHealthChecker(ctx)
err := s.server.etcdHealthChecker(s.ctx)
c.Assert(err, check.Equals, context.Canceled)
return nil
})
// longer than one check tick 3s
time.Sleep(time.Second * 4)
cancel()
s.cancel()
}

func (s *serverSuite) TestInitDataDir(c *check.C) {
func (s *serverSuite) TestSetUpDataDir(c *check.C) {
defer testleak.AfterTest(c)()
defer s.TearDownTest(c)

ctx, cancel := context.WithCancel(context.Background())
pdEndpoints := []string{
"http://" + s.clientURL.Host,
"http://invalid-pd-host:2379",
}
server, err := NewServer(pdEndpoints)
conf := config.GetGlobalServerConfig()
// DataDir is not set, and no changefeed exist, use the default
conf.DataDir = ""
err := s.server.setUpDataDir(s.ctx)
c.Assert(err, check.IsNil)
c.Assert(server, check.NotNil)
c.Assert(conf.DataDir, check.Equals, defaultDataDir)
c.Assert(conf.Sorter.SortDir, check.Equals, filepath.Join(defaultDataDir, config.DefaultSortDir))

conf := config.GetGlobalServerConfig()
conf.DataDir = c.MkDir()
// DataDir is not set, but has existed changefeed, use the one with the largest available space
conf.DataDir = ""
dir := c.MkDir()
err = s.server.etcdClient.SaveChangeFeedInfo(s.ctx, &model.ChangeFeedInfo{SortDir: dir}, "a")
c.Assert(err, check.IsNil)

err = server.initDataDir(ctx)
err = s.server.etcdClient.SaveChangeFeedInfo(s.ctx, &model.ChangeFeedInfo{}, "b")
c.Assert(err, check.IsNil)
c.Assert(conf.DataDir, check.Not(check.Equals), "")
c.Assert(conf.Sorter.SortDir, check.Equals, filepath.Join(conf.DataDir, "/tmp/sorter"))
config.StoreGlobalServerConfig(conf)

server.etcdClient = nil
conf.DataDir = ""
err = server.initDataDir(ctx)
err = s.server.setUpDataDir(s.ctx)
c.Assert(err, check.IsNil)

c.Assert(conf.DataDir, check.Equals, dir)
c.Assert(conf.Sorter.SortDir, check.Equals, filepath.Join(dir, config.DefaultSortDir))

conf.DataDir = c.MkDir()
// DataDir has been set, just use it
err = s.server.setUpDataDir(s.ctx)
c.Assert(err, check.IsNil)
c.Assert(conf.DataDir, check.Not(check.Equals), "")
c.Assert(conf.Sorter.SortDir, check.Equals, filepath.Join(conf.DataDir, config.DefaultSortDir))

cancel()
s.cancel()
}

0 comments on commit 11e574c

Please sign in to comment.