diff --git a/DEPS.bzl b/DEPS.bzl index c0e6c310b416b..6687e1334aac8 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -5867,13 +5867,13 @@ def go_deps(): name = "com_github_pingcap_kvproto", build_file_proto_mode = "disable_global", importpath = "github.com/pingcap/kvproto", - sha256 = "92a67bcc499c06fd3d76cc153362540b22eaf1b09c4bda62a1599ce876b8ed78", - strip_prefix = "github.com/pingcap/kvproto@v0.0.0-20241120071417-b5b7843d9037", + sha256 = "db08607b0c90f3909b66577e9c568d0cbd6b2825d287d7b5caab86ea6e4b60ad", + strip_prefix = "github.com/pingcap/kvproto@v0.0.0-20250108041715-3b77f2c65c63", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120071417-b5b7843d9037.zip", - "http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120071417-b5b7843d9037.zip", - "https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120071417-b5b7843d9037.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20241120071417-b5b7843d9037.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250108041715-3b77f2c65c63.zip", + "http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250108041715-3b77f2c65c63.zip", + "https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250108041715-3b77f2c65c63.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20250108041715-3b77f2c65c63.zip", ], ) go_repository( diff --git a/br/cmd/br/operator.go b/br/cmd/br/operator.go index 4e41adeab329f..abd0156a5457b 100644 --- a/br/cmd/br/operator.go +++ b/br/cmd/br/operator.go @@ -35,6 +35,8 @@ func newOperatorCommand() *cobra.Command { cmd.AddCommand(newBase64ifyCommand()) cmd.AddCommand(newListMigrationsCommand()) cmd.AddCommand(newMigrateToCommand()) + cmd.AddCommand(newForceFlushCommand()) + cmd.AddCommand(newChecksumCommand()) return cmd } @@ -109,3 +111,43 @@ func newMigrateToCommand() *cobra.Command { operator.DefineFlagsForMigrateToConfig(cmd.Flags()) return cmd } + +func newChecksumCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "checksum-as", + Short: "calculate the checksum with rewrite rules", + Long: "Calculate the checksum of the current cluster (specified by `-u`) " + + "with applying the rewrite rules generated from a backup (specified by `-s`). " + + "This can be used when you have the checksum of upstream elsewhere.", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + cfg := operator.ChecksumWithRewriteRulesConfig{} + if err := cfg.ParseFromFlags(cmd.Flags()); err != nil { + return err + } + ctx := GetDefaultContext() + return operator.RunChecksumTable(ctx, tidbGlue, cfg) + }, + } + task.DefineFilterFlags(cmd, []string{"!*.*"}, false) + operator.DefineFlagsForChecksumTableConfig(cmd.Flags()) + return cmd +} + +func newForceFlushCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "force-flush", + Short: "force a log backup task to flush", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + cfg := operator.ForceFlushConfig{} + if err := cfg.ParseFromFlags(cmd.Flags()); err != nil { + return err + } + ctx := GetDefaultContext() + return operator.RunForceFlush(ctx, &cfg) + }, + } + operator.DefineFlagsForForceFlushConfig(cmd.Flags()) + return cmd +} diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index 22fa031854fbe..bbc74b0d44b85 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -183,6 +183,11 @@ func (f *fakeStore) GetID() uint64 { return f.id } +func (f *fakeStore) FlushNow(ctx context.Context, in *logbackup.FlushNowRequest, opts ...grpc.CallOption) (*logbackup.FlushNowResponse, error) { + f.flush() + return &logbackup.FlushNowResponse{Results: []*logbackup.FlushResult{{TaskName: "Universe", Success: true}}}, nil +} + func (f *fakeStore) SubscribeFlushEvent(ctx context.Context, in *logbackup.SubscribeFlushEventRequest, opts ...grpc.CallOption) (logbackup.LogBackup_SubscribeFlushEventClient, error) { f.clientMu.Lock() defer f.clientMu.Unlock() diff --git a/br/pkg/task/operator/BUILD.bazel b/br/pkg/task/operator/BUILD.bazel index 14760027a49b8..6d232d6c36bf0 100644 --- a/br/pkg/task/operator/BUILD.bazel +++ b/br/pkg/task/operator/BUILD.bazel @@ -4,7 +4,9 @@ go_library( name = "operator", srcs = [ "base64ify.go", + "checksum_table.go", "config.go", + "force_flush.go", "list_migration.go", "migrate_to.go", "prepare_snap.go", @@ -12,22 +14,37 @@ go_library( importpath = "github.com/pingcap/tidb/br/pkg/task/operator", visibility = ["//visibility:public"], deps = [ + "//br/pkg/backup", "//br/pkg/backup/prepare_snap", + "//br/pkg/checksum", + "//br/pkg/conn", "//br/pkg/errors", "//br/pkg/glue", "//br/pkg/logutil", + "//br/pkg/metautil", "//br/pkg/pdutil", "//br/pkg/storage", "//br/pkg/stream", "//br/pkg/task", "//br/pkg/utils", + "//pkg/domain", + "//pkg/meta/model", + "//pkg/util", + "//pkg/util/engine", "@com_github_fatih_color//:color", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/brpb", + "@com_github_pingcap_kvproto//pkg/logbackuppb", + "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_log//:log", "@com_github_spf13_pflag//:pflag", + "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_client_go_v2//tikv", + "@com_github_tikv_client_go_v2//util", + "@com_github_tikv_pd_client//:client", + "@com_github_tikv_pd_client//opt", + "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//keepalive", "@org_golang_x_sync//errgroup", "@org_uber_go_multierr//:multierr", diff --git a/br/pkg/task/operator/checksum_table.go b/br/pkg/task/operator/checksum_table.go new file mode 100644 index 0000000000000..59c52f6eb4cea --- /dev/null +++ b/br/pkg/task/operator/checksum_table.go @@ -0,0 +1,269 @@ +package operator + +import ( + "context" + "encoding/json" + "os" + "sync" + "sync/atomic" + + "github.com/pingcap/errors" + backup "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/log" + "github.com/pingcap/tidb/br/pkg/checksum" + "github.com/pingcap/tidb/br/pkg/conn" + "github.com/pingcap/tidb/br/pkg/glue" + "github.com/pingcap/tidb/br/pkg/metautil" + "github.com/pingcap/tidb/br/pkg/task" + "github.com/pingcap/tidb/pkg/domain" + "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/util" + "github.com/tikv/client-go/v2/oracle" + kvutil "github.com/tikv/client-go/v2/util" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +type checksumTableCtx struct { + cfg ChecksumWithRewriteRulesConfig + + mgr *conn.Mgr + dom *domain.Domain +} + +type tableInDB struct { + info *model.TableInfo + dbName string +} + +func RunChecksumTable(ctx context.Context, g glue.Glue, cfg ChecksumWithRewriteRulesConfig) error { + c := &checksumTableCtx{cfg: cfg} + + if err := c.init(ctx, g); err != nil { + return errors.Trace(err) + } + + curr, err := c.getTables(ctx) + if err != nil { + return errors.Trace(err) + } + + old, err := c.loadOldTableIDs(ctx) + if err != nil { + return errors.Trace(err) + } + + reqs, err := c.genRequests(ctx, old, curr) + if err != nil { + return errors.Trace(err) + } + + results, err := c.runChecksum(ctx, reqs) + if err != nil { + return errors.Trace(err) + } + + for _, result := range results { + log.Info("Checksum result", zap.String("db", result.DBName), zap.String("table", result.TableName), zap.Uint64("checksum", result.Checksum), + zap.Uint64("total_bytes", result.TotalBytes), zap.Uint64("total_kvs", result.TotalKVs)) + } + + return json.NewEncoder(os.Stdout).Encode(results) +} + +func (c *checksumTableCtx) init(ctx context.Context, g glue.Glue) error { + cfg := c.cfg + var err error + c.mgr, err = task.NewMgr(ctx, g, cfg.PD, cfg.TLS, task.GetKeepalive(&cfg.Config), cfg.CheckRequirements, true, conn.NormalVersionChecker) + if err != nil { + return err + } + + c.dom, err = g.GetDomain(c.mgr.GetStorage()) + if err != nil { + return err + } + return nil +} + +func (c *checksumTableCtx) getTables(ctx context.Context) (res []tableInDB, err error) { + sch := c.dom.InfoSchema() + dbs := sch.AllSchemas() + for _, db := range dbs { + if !c.cfg.TableFilter.MatchSchema(db.Name.L) { + continue + } + + tbls, err := sch.SchemaTableInfos(ctx, db.Name) + if err != nil { + return nil, errors.Annotatef(err, "failed to load data for db %s", db.Name) + } + for _, tbl := range tbls { + if !c.cfg.TableFilter.MatchTable(db.Name.L, tbl.Name.L) { + continue + } + log.Info("Added table from cluster.", zap.String("db", db.Name.L), zap.String("table", tbl.Name.L)) + res = append(res, tableInDB{ + info: tbl, + dbName: db.Name.L, + }) + } + } + + return +} + +func (c *checksumTableCtx) loadOldTableIDs(ctx context.Context) (res []*metautil.Table, err error) { + _, strg, err := task.GetStorage(ctx, c.cfg.Storage, &c.cfg.Config) + if err != nil { + return nil, errors.Annotate(err, "failed to create storage") + } + + mPath := metautil.MetaFile + metaContent, err := strg.ReadFile(ctx, mPath) + if err != nil { + return nil, errors.Annotatef(err, "failed to open metafile %s", mPath) + } + + var backupMeta backup.BackupMeta + if err := backupMeta.Unmarshal(metaContent); err != nil { + return nil, errors.Annotate(err, "failed to parse backupmeta") + } + + metaReader := metautil.NewMetaReader(&backupMeta, strg, &c.cfg.CipherInfo) + + tblCh := make(chan *metautil.Table, 1024) + errCh := make(chan error, 1) + go func() { + if err := metaReader.ReadSchemasFiles(ctx, tblCh, metautil.SkipFiles, metautil.SkipStats); err != nil { + errCh <- errors.Annotate(err, "failed to read schema files") + } + close(tblCh) + }() + + for { + select { + case err := <-errCh: + return nil, err + case tbl, ok := <-tblCh: + if !ok { + return + } + if !c.cfg.TableFilter.MatchTable(tbl.DB.Name.L, tbl.Info.Name.L) { + continue + } + log.Info("Added table from backup data.", zap.String("db", tbl.DB.Name.L), zap.String("table", tbl.Info.Name.L)) + res = append(res, tbl) + case <-ctx.Done(): + return nil, ctx.Err() + } + } +} + +type request struct { + copReq *checksum.Executor + tableName string + dbName string +} + +func (c *checksumTableCtx) genRequests(ctx context.Context, bkup []*metautil.Table, curr []tableInDB) (reqs []request, err error) { + phy, logi, err := c.mgr.GetPDClient().GetTS(ctx) + if err != nil { + return nil, errors.Annotate(err, "failed to get TSO for checksumming") + } + tso := oracle.ComposeTS(phy, logi) + + bkupTbls := map[string]map[string]*metautil.Table{} + for _, t := range bkup { + m, ok := bkupTbls[t.DB.Name.L] + if !ok { + m = make(map[string]*metautil.Table) + bkupTbls[t.DB.Name.L] = m + } + + m[t.Info.Name.L] = t + } + + for _, t := range curr { + rb := checksum.NewExecutorBuilder(t.info, tso) + rb.SetConcurrency(c.cfg.ChecksumConcurrency) + oldDB, ok := bkupTbls[t.dbName] + if !ok { + log.Warn("db not found, will skip", zap.String("db", t.dbName)) + continue + } + oldTable, ok := oldDB[t.info.Name.L] + if !ok { + log.Warn("table not found, will skip", zap.String("db", t.dbName), zap.String("table", t.info.Name.L)) + continue + } + + rb.SetOldTable(oldTable) + rb.SetExplicitRequestSourceType(kvutil.ExplicitTypeBR) + req, err := rb.Build() + if err != nil { + return nil, errors.Annotatef(err, "failed to build checksum builder for table %s.%s", t.dbName, t.info.Name.L) + } + reqs = append(reqs, request{ + copReq: req, + dbName: t.dbName, + tableName: t.info.Name.L, + }) + } + + return +} + +type ChecksumResult struct { + DBName string `json:"db_name"` + TableName string `json:"table_name"` + + Checksum uint64 `json:"checksum"` + TotalBytes uint64 `json:"total_bytes"` + TotalKVs uint64 `json:"total_kvs"` +} + +func (c *checksumTableCtx) runChecksum(ctx context.Context, reqs []request) ([]ChecksumResult, error) { + wkPool := util.NewWorkerPool(c.cfg.TableConcurrency, "checksum") + eg, ectx := errgroup.WithContext(ctx) + results := make([]ChecksumResult, 0, len(reqs)) + resultsMu := new(sync.Mutex) + + for _, req := range reqs { + wkPool.ApplyOnErrorGroup(eg, func() error { + total := req.copReq.Len() + finished := new(atomic.Int64) + resp, err := req.copReq.Execute(ectx, c.mgr.GetStorage().GetClient(), func() { + finished.Add(1) + log.Info( + "Finish one request of a table.", + zap.String("db", req.dbName), + zap.String("table", req.tableName), + zap.Int64("finished", finished.Load()), + zap.Int64("total", int64(total)), + ) + }) + if err != nil { + return err + } + res := ChecksumResult{ + DBName: req.dbName, + TableName: req.tableName, + + Checksum: resp.Checksum, + TotalBytes: resp.TotalBytes, + TotalKVs: resp.TotalKvs, + } + resultsMu.Lock() + results = append(results, res) + resultsMu.Unlock() + return nil + }) + } + + if err := eg.Wait(); err != nil { + return nil, err + } + + return results, nil +} diff --git a/br/pkg/task/operator/config.go b/br/pkg/task/operator/config.go index c42382abe504d..03996beed3011 100644 --- a/br/pkg/task/operator/config.go +++ b/br/pkg/task/operator/config.go @@ -3,15 +3,32 @@ package operator import ( + "regexp" "time" "github.com/pingcap/errors" + "github.com/pingcap/tidb/br/pkg/backup" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/task" "github.com/spf13/pflag" ) +const ( + flagTableConcurrency = "table-concurrency" + flagStorePatterns = "stores" + flagTTL = "ttl" + flagSafePoint = "safepoint" + flagStorage = "storage" + flagLoadCreds = "load-creds" + flagJSON = "json" + flagRecent = "recent" + flagTo = "to" + flagBase = "base" + flagYes = "yes" + flagDryRun = "dry-run" +) + type PauseGcConfig struct { task.Config @@ -23,8 +40,8 @@ type PauseGcConfig struct { } func DefineFlagsForPrepareSnapBackup(f *pflag.FlagSet) { - _ = f.DurationP("ttl", "i", 2*time.Minute, "The time-to-live of the safepoint.") - _ = f.Uint64P("safepoint", "t", 0, "The GC safepoint to be kept.") + _ = f.DurationP(flagTTL, "i", 2*time.Minute, "The time-to-live of the safepoint.") + _ = f.Uint64P(flagSafePoint, "t", 0, "The GC safepoint to be kept.") } // ParseFromFlags fills the config via the flags. @@ -34,11 +51,11 @@ func (cfg *PauseGcConfig) ParseFromFlags(flags *pflag.FlagSet) error { } var err error - cfg.SafePoint, err = flags.GetUint64("safepoint") + cfg.SafePoint, err = flags.GetUint64(flagSafePoint) if err != nil { return err } - cfg.TTL, err = flags.GetDuration("ttl") + cfg.TTL, err = flags.GetDuration(flagTTL) if err != nil { return err } @@ -54,8 +71,8 @@ type Base64ifyConfig struct { func DefineFlagsForBase64ifyConfig(flags *pflag.FlagSet) { storage.DefineFlags(flags) - flags.StringP("storage", "s", "", "The external storage input.") - flags.Bool("load-creds", false, "whether loading the credientials from current environment and marshal them to the base64 string. [!]") + flags.StringP(flagStorage, "s", "", "The external storage input.") + flags.Bool(flagLoadCreds, false, "whether loading the credientials from current environment and marshal them to the base64 string. [!]") } func (cfg *Base64ifyConfig) ParseFromFlags(flags *pflag.FlagSet) error { @@ -64,11 +81,11 @@ func (cfg *Base64ifyConfig) ParseFromFlags(flags *pflag.FlagSet) error { if err != nil { return err } - cfg.StorageURI, err = flags.GetString("storage") + cfg.StorageURI, err = flags.GetString(flagStorage) if err != nil { return err } - cfg.LoadCerd, err = flags.GetBool("load-creds") + cfg.LoadCerd, err = flags.GetBool(flagLoadCreds) if err != nil { return err } @@ -83,8 +100,8 @@ type ListMigrationConfig struct { func DefineFlagsForListMigrationConfig(flags *pflag.FlagSet) { storage.DefineFlags(flags) - flags.StringP("storage", "s", "", "the external storage input.") - flags.Bool("json", false, "output the result in json format.") + flags.StringP(flagStorage, "s", "", "the external storage input.") + flags.Bool(flagJSON, false, "output the result in json format.") } func (cfg *ListMigrationConfig) ParseFromFlags(flags *pflag.FlagSet) error { @@ -93,11 +110,11 @@ func (cfg *ListMigrationConfig) ParseFromFlags(flags *pflag.FlagSet) error { if err != nil { return err } - cfg.StorageURI, err = flags.GetString("storage") + cfg.StorageURI, err = flags.GetString(flagStorage) if err != nil { return err } - cfg.JSONOutput, err = flags.GetBool("json") + cfg.JSONOutput, err = flags.GetBool(flagJSON) if err != nil { return err } @@ -115,15 +132,6 @@ type MigrateToConfig struct { DryRun bool } -const ( - flagStorage = "storage" - flagRecent = "recent" - flagTo = "to" - flagBase = "base" - flagYes = "yes" - flagDryRun = "dry-run" -) - func DefineFlagsForMigrateToConfig(flags *pflag.FlagSet) { storage.DefineFlags(flags) flags.StringP(flagStorage, "s", "", "the external storage input.") @@ -180,3 +188,43 @@ func (cfg *MigrateToConfig) Verify() error { } return nil } + +type ForceFlushConfig struct { + task.Config + + StoresPattern *regexp.Regexp +} + +func DefineFlagsForForceFlushConfig(f *pflag.FlagSet) { + f.String(flagStorePatterns, ".*", "The regexp to match the store peer address to be force flushed.") +} + +func (cfg *ForceFlushConfig) ParseFromFlags(flags *pflag.FlagSet) (err error) { + storePat, err := flags.GetString(flagStorePatterns) + if err != nil { + return err + } + cfg.StoresPattern, err = regexp.Compile(storePat) + if err != nil { + return errors.Annotatef(err, "invalid expression in --%s", flagStorePatterns) + } + + return cfg.Config.ParseFromFlags(flags) +} + +type ChecksumWithRewriteRulesConfig struct { + task.Config +} + +func DefineFlagsForChecksumTableConfig(f *pflag.FlagSet) { + f.Uint(flagTableConcurrency, backup.DefaultSchemaConcurrency, "The size of a BR thread pool used for backup table metas, "+ + "including tableInfo/checksum and stats.") +} + +func (cfg *ChecksumWithRewriteRulesConfig) ParseFromFlags(flags *pflag.FlagSet) (err error) { + cfg.TableConcurrency, err = flags.GetUint(flagTableConcurrency) + if err != nil { + return + } + return cfg.Config.ParseFromFlags(flags) +} diff --git a/br/pkg/task/operator/force_flush.go b/br/pkg/task/operator/force_flush.go new file mode 100644 index 0000000000000..838c582e289a3 --- /dev/null +++ b/br/pkg/task/operator/force_flush.go @@ -0,0 +1,99 @@ +package operator + +import ( + "context" + "crypto/tls" + "slices" + + "github.com/pingcap/errors" + logbackup "github.com/pingcap/kvproto/pkg/logbackuppb" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" + "github.com/pingcap/tidb/br/pkg/task" + "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/pkg/util/engine" + pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/opt" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" +) + +func getAllTiKVs(ctx context.Context, p pd.Client) ([]*metapb.Store, error) { + stores, err := p.GetAllStores(ctx, opt.WithExcludeTombstone()) + if err != nil { + return nil, err + } + withoutTiFlash := slices.DeleteFunc(stores, engine.IsTiFlash) + return withoutTiFlash, err +} + +func createStoreManager(pd pd.Client, cfg *task.Config) (*utils.StoreManager, error) { + var ( + tconf *tls.Config + err error + ) + + if cfg.TLS.IsEnabled() { + tconf, err = cfg.TLS.ToTLSConfig() + if err != nil { + return nil, errors.Annotate(err, "invalid tls config") + } + } + kvMgr := utils.NewStoreManager(pd, keepalive.ClientParameters{ + Time: cfg.GRPCKeepaliveTime, + Timeout: cfg.GRPCKeepaliveTimeout, + }, tconf) + return kvMgr, nil +} + +func RunForceFlush(ctx context.Context, cfg *ForceFlushConfig) error { + pdMgr, err := dialPD(ctx, &cfg.Config) + if err != nil { + return err + } + defer pdMgr.Close() + + stores, err := createStoreManager(pdMgr.GetPDClient(), &cfg.Config) + if err != nil { + return err + } + defer stores.Close() + + tikvs, err := getAllTiKVs(ctx, pdMgr.GetPDClient()) + if err != nil { + return err + } + eg, ectx := errgroup.WithContext(ctx) + log.Info("About to start force flushing.", zap.Stringer("stores-pattern", cfg.StoresPattern)) + for _, s := range tikvs { + if !cfg.StoresPattern.MatchString(s.Address) || engine.IsTiFlash(s) { + log.Info("Skipping TiFlash or not matched TiKV.", + zap.Uint64("store", s.GetId()), zap.String("addr", s.Address), zap.Bool("tiflash?", engine.IsTiFlash(s))) + } + log.Info("Starting force flush TiKV.", zap.Uint64("store", s.GetId()), zap.String("addr", s.Address)) + eg.Go(func() error { + var logBackupCli logbackup.LogBackupClient + err := stores.WithConn(ectx, s.GetId(), func(cc *grpc.ClientConn) { + logBackupCli = logbackup.NewLogBackupClient(cc) + }) + if err != nil { + return err + } + + resp, err := logBackupCli.FlushNow(ectx, &logbackup.FlushNowRequest{}) + if err != nil { + return errors.Annotatef(err, "failed to flush store %d", s.GetId()) + } + for _, res := range resp.Results { + if !res.Success { + return errors.Errorf("failed to flush task %s at store %d: %s", res.TaskName, s.GetId(), res.ErrorMessage) + } + log.Info("Force flushed task of TiKV store.", zap.Uint64("store", s.Id), zap.String("task", res.TaskName)) + } + return nil + }) + } + return eg.Wait() +} diff --git a/br/tests/br_test_utils.sh b/br/tests/br_test_utils.sh index 9102415a77e14..9d2c79fe5a452 100644 --- a/br/tests/br_test_utils.sh +++ b/br/tests/br_test_utils.sh @@ -22,6 +22,8 @@ wait_log_checkpoint_advance() { sleep 10 local current_ts=$(python3 -c "import time; print(int(time.time() * 1000) << 18)") echo "current ts: $current_ts" + + run_br --skip-goleak --pd $PD_ADDR operator force-flush || echo "failed to run force flush, the case may be slower." i=0 while true; do # extract the checkpoint ts of the log backup task. If there is some error, the checkpoint ts should be empty diff --git a/go.mod b/go.mod index 703c543cc7c01..7c271491e1b05 100644 --- a/go.mod +++ b/go.mod @@ -87,7 +87,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20240318064555-6bd07397691f github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 github.com/pingcap/fn v1.0.0 - github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 + github.com/pingcap/kvproto v0.0.0-20250108041715-3b77f2c65c63 github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5 github.com/pingcap/tidb/pkg/parser v0.0.0-20211011031125-9b13dc409c5e diff --git a/go.sum b/go.sum index b4c5f48cd5720..ed16fdd52adf4 100644 --- a/go.sum +++ b/go.sum @@ -676,8 +676,8 @@ github.com/pingcap/fn v1.0.0/go.mod h1:u9WZ1ZiOD1RpNhcI42RucFh/lBuzTu6rw88a+oF2Z github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 h1:xYNSJjYNur4Dr5bV+9BXK9n5E0T1zlcAN25XX68+mOg= -github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20250108041715-3b77f2c65c63 h1:ThJ7ddLJVk96Iai2HDeyJGuuhrcBtc3HwYKJfuKPLsI= +github.com/pingcap/kvproto v0.0.0-20250108041715-3b77f2c65c63/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a h1:WIhmJBlNGmnCWH6TLMdZfNEDaiU8cFpZe3iaqDbQ0M8=