diff --git a/pkg/executor/historical_stats_test.go b/pkg/executor/historical_stats_test.go index c448eb628c10f..bc7ad7f3f40e5 100644 --- a/pkg/executor/historical_stats_test.go +++ b/pkg/executor/historical_stats_test.go @@ -403,3 +403,39 @@ PARTITION p0 VALUES LESS THAN (6) require.NotNil(t, jt) require.False(t, jt.IsHistoricalStats) } + +func TestDumpHistoricalStatsMetaForMultiTables(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_historical_stats = 1") + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t1(a int, b varchar(10), index idx(a, b))") + tk.MustExec("create table t2(a int, b varchar(10), index idx(a, b))") + // Insert some data. + tk.MustExec("insert into t1 values (1, 'a'), (2, 'b'), (3, 'c')") + tk.MustExec("insert into t2 values (1, 'a'), (2, 'b'), (3, 'c')") + // Analyze the tables. + tk.MustExec("analyze table t1") + tk.MustExec("analyze table t2") + h := dom.StatsHandle() + // Update the stats cache. + require.NoError(t, h.Update(context.Background(), dom.InfoSchema())) + + // Insert more data. + tk.MustExec("insert into t1 values (4, 'd'), (5, 'e'), (6, 'f')") + tk.MustExec("insert into t2 values (4, 'd'), (5, 'e'), (6, 'f')") + // Dump stats delta to kv. + require.NoError(t, h.DumpStatsDeltaToKV(true)) + + // Check historical stats meta. + tbl1, err := dom.InfoSchema().TableByName(context.Background(), ast.NewCIStr("test"), ast.NewCIStr("t1")) + require.NoError(t, err) + tbl2, err := dom.InfoSchema().TableByName(context.Background(), ast.NewCIStr("test"), ast.NewCIStr("t2")) + require.NoError(t, err) + rows := tk.MustQuery("select version from mysql.stats_meta_history where table_id = ? order by version desc limit 1", tbl1.Meta().ID).Rows() + version1 := rows[0][0].(string) + rows = tk.MustQuery("select version from mysql.stats_meta_history where table_id = ? order by version desc limit 1", tbl2.Meta().ID).Rows() + version2 := rows[0][0].(string) + require.Equal(t, version1, version2) +} diff --git a/pkg/statistics/handle/ddl/subscriber.go b/pkg/statistics/handle/ddl/subscriber.go index a4af15a1815d0..7398e9edb0795 100644 --- a/pkg/statistics/handle/ddl/subscriber.go +++ b/pkg/statistics/handle/ddl/subscriber.go @@ -304,9 +304,9 @@ func (h subscriber) recordHistoricalStatsMeta( return history.RecordHistoricalStatsMeta( ctx, sctx, - id, startTS, util.StatsMetaHistorySourceSchemaChange, + id, ) } diff --git a/pkg/statistics/handle/history/BUILD.bazel b/pkg/statistics/handle/history/BUILD.bazel index 1a1a44f789cea..d5fad1ce83f81 100644 --- a/pkg/statistics/handle/history/BUILD.bazel +++ b/pkg/statistics/handle/history/BUILD.bazel @@ -9,10 +9,16 @@ go_library( "//pkg/meta/model", "//pkg/sessionctx", "//pkg/statistics/handle/cache", + "//pkg/statistics/handle/logutil", "//pkg/statistics/handle/storage", "//pkg/statistics/handle/types", "//pkg/statistics/handle/util", +<<<<<<< HEAD "//pkg/util/logutil", +======= + "//pkg/statistics/util", + "//pkg/util/intest", +>>>>>>> a1beeb1200e (statistics: update RecordHistoricalStatsMeta to handle multiple table IDs (#59037)) "@com_github_pingcap_errors//:errors", "@org_uber_go_zap//:zap", ], diff --git a/pkg/statistics/handle/history/history_stats.go b/pkg/statistics/handle/history/history_stats.go index 1382d6ddbde4f..f3f57f1dfd41d 100644 --- a/pkg/statistics/handle/history/history_stats.go +++ b/pkg/statistics/handle/history/history_stats.go @@ -16,16 +16,27 @@ package history import ( "context" + "fmt" + "slices" + "strconv" + "strings" "time" "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/statistics/handle/cache" + statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil" "github.com/pingcap/tidb/pkg/statistics/handle/storage" "github.com/pingcap/tidb/pkg/statistics/handle/types" +<<<<<<< HEAD "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/util/logutil" +======= + handleutil "github.com/pingcap/tidb/pkg/statistics/handle/util" + statsutil "github.com/pingcap/tidb/pkg/statistics/util" + "github.com/pingcap/tidb/pkg/util/intest" +>>>>>>> a1beeb1200e (statistics: update RecordHistoricalStatsMeta to handle multiple table IDs (#59037)) "go.uber.org/zap" ) @@ -55,7 +66,7 @@ func (sh *statsHistoryImpl) RecordHistoricalStatsToStorage(dbName string, tableI return 0, errors.Trace(err) } if js == nil { - logutil.BgLogger().Warn("no stats data to record", zap.String("dbName", dbName), zap.String("tableName", tableInfo.Name.O)) + statslogutil.StatsLogger().Warn("no stats data to record", zap.String("dbName", dbName), zap.String("tableName", tableInfo.Name.O)) return 0, nil } var version uint64 @@ -66,20 +77,26 @@ func (sh *statsHistoryImpl) RecordHistoricalStatsToStorage(dbName string, tableI return version, err } -// RecordHistoricalStatsMeta records stats meta of the specified version to stats_meta_history table. -func (sh *statsHistoryImpl) RecordHistoricalStatsMeta(tableID int64, version uint64, source string, enforce bool) { +// RecordHistoricalStatsMeta records the historical stats meta in mysql.stats_meta_history with a single transaction. +func (sh *statsHistoryImpl) RecordHistoricalStatsMeta(version uint64, source string, enforce bool, tableIDs ...int64) { if version == 0 { return } - if !enforce { - tbl, ok := sh.statsHandle.Get(tableID) - if !ok { - return - } - if !tbl.IsInitialized() { - return + + var targetedTableIDs []int64 + if enforce { + targetedTableIDs = tableIDs + } else { + targetedTableIDs = make([]int64, 0, len(tableIDs)) + for _, tableID := range tableIDs { + tbl, ok := sh.statsHandle.Get(tableID) + if tableID == 0 || !ok || !tbl.IsInitialized() { + continue + } + targetedTableIDs = append(targetedTableIDs, tableID) } } +<<<<<<< HEAD err := util.CallWithSCtx(sh.statsHandle.SPool(), func(sctx sessionctx.Context) error { if !sctx.GetSessionVars().EnableHistoricalStats { return nil @@ -89,8 +106,24 @@ func (sh *statsHistoryImpl) RecordHistoricalStatsMeta(tableID int64, version uin if err != nil { // just log the error, hide the error from the outside caller. logutil.BgLogger().Error("record historical stats meta failed", zap.Int64("table-id", tableID), +======= + // Sort the tableIDs to avoid deadlocks. + slices.Sort(targetedTableIDs) + + err := handleutil.CallWithSCtx(sh.statsHandle.SPool(), func(sctx sessionctx.Context) error { + if !sctx.GetSessionVars().EnableHistoricalStats { + return nil + } + return RecordHistoricalStatsMeta(handleutil.StatsCtx, sctx, version, source, targetedTableIDs...) + }, handleutil.FlagWrapTxn) + + if err != nil { + statslogutil.StatsLogger().Error("record historical stats meta failed", +>>>>>>> a1beeb1200e (statistics: update RecordHistoricalStatsMeta to handle multiple table IDs (#59037)) zap.Uint64("version", version), zap.String("source", source), + zap.Int64s("tableIDs", tableIDs), + zap.Int64s("targetedTableIDs", targetedTableIDs), zap.Error(err)) } } @@ -104,17 +137,30 @@ func (sh *statsHistoryImpl) CheckHistoricalStatsEnable() (enable bool, err error return } -// RecordHistoricalStatsMeta records the historical stats meta. +// RecordHistoricalStatsMeta records the historical stats meta for multiple tables. func RecordHistoricalStatsMeta( ctx context.Context, sctx sessionctx.Context, - tableID int64, version uint64, source string, + tableIDs ...int64, ) error { - if tableID == 0 || version == 0 { - return errors.Errorf("tableID %d, version %d are invalid", tableID, version) + intest.Assert(version != 0, "version should not be zero") + intest.AssertFunc(func() bool { + for _, id := range tableIDs { + if id == 0 { + return false + } + } + return true + }, "tableIDs should not contain 0") + intest.AssertFunc(func() bool { + return slices.IsSorted(tableIDs) + }, "tableIDs should be sorted") + if len(tableIDs) == 0 { + return nil } +<<<<<<< HEAD rows, _, err := util.ExecRowsWithCtx( ctx, sctx, @@ -122,14 +168,71 @@ func RecordHistoricalStatsMeta( tableID, version, ) - if err != nil { +======= + + // Enable prepared statement cache to avoid repeated compilation of the same statement. + if _, err := handleutil.ExecWithCtx(ctx, sctx, "SET tidb_enable_prepared_plan_cache = ON"); err != nil { return errors.Trace(err) } - if len(rows) == 0 { - return errors.New("no historical meta stats can be recorded") + defer func() { + _, err := handleutil.ExecWithCtx(ctx, sctx, "SET tidb_enable_prepared_plan_cache = OFF") + if err != nil { + statslogutil.StatsLogger().Error("failed to disable prepared statement cache", zap.Error(errors.Trace(err))) + } + }() + prepareSelectForUpdate := ` + PREPARE select_stmt FROM 'SELECT * FROM mysql.stats_meta WHERE table_id = ? AND version = ? FOR UPDATE' + ` + if _, err := handleutil.ExecWithCtx(ctx, sctx, prepareSelectForUpdate); err != nil { + return errors.Trace(err) } - modifyCount, count := rows[0].GetInt64(0), rows[0].GetInt64(1) + defer func() { + _, err := handleutil.ExecWithCtx(ctx, sctx, "DEALLOCATE PREPARE select_stmt") + if err != nil { + statslogutil.StatsLogger().Error("failed to deallocate prepared statement", zap.Error(errors.Trace(err))) + } + }() + + // Lock the rows one by one to avoid deadlocks. + for _, tableID := range tableIDs { + _, err := handleutil.ExecWithCtx(ctx, sctx, fmt.Sprintf("SET @table_id = %d, @version = %d", tableID, version)) + if err != nil { + return errors.Trace(err) + } + rows, _, err := handleutil.ExecRowsWithCtx(ctx, sctx, "EXECUTE select_stmt USING @table_id, @version") + if err != nil { + return errors.Trace(err) + } + intest.Assert(len(rows) != 0, "no historical meta stats can be recorded") + if len(rows) == 0 { + statslogutil.StatsLogger().Warn("no historical meta stats can be recorded", + zap.Int64("tableID", tableID), + zap.Uint64("version", version), + ) + } + } + + // Convert tableIDs to string for SQL IN clause + tableIDStrs := make([]string, 0, len(tableIDs)) + for _, id := range tableIDs { + tableIDStrs = append(tableIDStrs, strconv.FormatInt(id, 10)) + } + tableIDsStr := strings.Join(tableIDStrs, ",") + + // Single query that combines SELECT and INSERT + sql := fmt.Sprintf(`REPLACE INTO mysql.stats_meta_history(table_id, modify_count, count, version, source, create_time) + SELECT table_id, modify_count, count, %d, '%s', NOW() + FROM mysql.stats_meta + WHERE table_id IN (%s) AND version = %d`, + version, source, tableIDsStr, version) + _, err := handleutil.ExecWithCtx(ctx, sctx, sql) +>>>>>>> a1beeb1200e (statistics: update RecordHistoricalStatsMeta to handle multiple table IDs (#59037)) + if err != nil { + return errors.Trace(err) + } + +<<<<<<< HEAD const sql = "REPLACE INTO mysql.stats_meta_history(table_id, modify_count, count, version, source, create_time) VALUES (%?, %?, %?, %?, %?, NOW())" if _, err := util.ExecWithCtx( ctx, @@ -142,8 +245,13 @@ func RecordHistoricalStatsMeta( source, ); err != nil { return errors.Trace(err) +======= + // Invalidate cache for all tables + for _, tableID := range tableIDs { + cache.TableRowStatsCache.Invalidate(tableID) +>>>>>>> a1beeb1200e (statistics: update RecordHistoricalStatsMeta to handle multiple table IDs (#59037)) } - cache.TableRowStatsCache.Invalidate(tableID) + return nil } diff --git a/pkg/statistics/handle/storage/stats_read_writer.go b/pkg/statistics/handle/storage/stats_read_writer.go index ca3180fbf03b8..4937c6986874b 100644 --- a/pkg/statistics/handle/storage/stats_read_writer.go +++ b/pkg/statistics/handle/storage/stats_read_writer.go @@ -56,7 +56,7 @@ func (s *statsReadWriter) InsertColStats2KV(physicalID int64, colInfos []*model. statsVer := uint64(0) defer func() { if err == nil && statsVer != 0 { - s.statsHandler.RecordHistoricalStatsMeta(physicalID, statsVer, util.StatsMetaHistorySourceSchemaChange, false) + s.statsHandler.RecordHistoricalStatsMeta(statsVer, util.StatsMetaHistorySourceSchemaChange, false, physicalID) } }() @@ -76,7 +76,7 @@ func (s *statsReadWriter) InsertTableStats2KV(info *model.TableInfo, physicalID statsVer := uint64(0) defer func() { if err == nil && statsVer != 0 { - s.statsHandler.RecordHistoricalStatsMeta(physicalID, statsVer, util.StatsMetaHistorySourceSchemaChange, false) + s.statsHandler.RecordHistoricalStatsMeta(statsVer, util.StatsMetaHistorySourceSchemaChange, false, physicalID) } }() @@ -103,7 +103,7 @@ func (s *statsReadWriter) UpdateStatsMetaVersionForGC(physicalID int64) (err err statsVer := uint64(0) defer func() { if err == nil && statsVer != 0 { - s.statsHandler.RecordHistoricalStatsMeta(physicalID, statsVer, util.StatsMetaHistorySourceSchemaChange, false) + s.statsHandler.RecordHistoricalStatsMeta(statsVer, util.StatsMetaHistorySourceSchemaChange, false, physicalID) } }() @@ -134,7 +134,7 @@ func (s *statsReadWriter) SaveTableStatsToStorage(results *statistics.AnalyzeRes }, util.FlagWrapTxn) if err == nil && statsVer != 0 { tableID := results.TableID.GetStatisticsID() - s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, source, true) + s.statsHandler.RecordHistoricalStatsMeta(statsVer, source, true, tableID) } return err } @@ -185,7 +185,7 @@ func (s *statsReadWriter) SaveStatsToStorage( return err }, util.FlagWrapTxn) if err == nil && statsVer != 0 { - s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, source, false) + s.statsHandler.RecordHistoricalStatsMeta(statsVer, source, false, tableID) } return } @@ -198,7 +198,7 @@ func (s *statsReadWriter) SaveMetaToStorage(tableID, count, modifyCount int64, s return err }, util.FlagWrapTxn) if err == nil && statsVer != 0 { - s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, source, false) + s.statsHandler.RecordHistoricalStatsMeta(statsVer, source, false, tableID) } return } @@ -211,7 +211,7 @@ func (s *statsReadWriter) InsertExtendedStats(statsName string, colIDs []int64, return err }, util.FlagWrapTxn) if err == nil && statsVer != 0 { - s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, "extended stats", false) + s.statsHandler.RecordHistoricalStatsMeta(statsVer, "extended stats", false, tableID) } return } @@ -224,7 +224,7 @@ func (s *statsReadWriter) MarkExtendedStatsDeleted(statsName string, tableID int return err }, util.FlagWrapTxn) if err == nil && statsVer != 0 { - s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, "extended stats", false) + s.statsHandler.RecordHistoricalStatsMeta(statsVer, "extended stats", false, tableID) } return } @@ -237,7 +237,7 @@ func (s *statsReadWriter) SaveExtendedStatsToStorage(tableID int64, extStats *st return err }, util.FlagWrapTxn) if err == nil && statsVer != 0 { - s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, "extended stats", false) + s.statsHandler.RecordHistoricalStatsMeta(statsVer, "extended stats", false, tableID) } return } diff --git a/pkg/statistics/handle/types/interfaces.go b/pkg/statistics/handle/types/interfaces.go index 40028176ad98b..20ad4c7840387 100644 --- a/pkg/statistics/handle/types/interfaces.go +++ b/pkg/statistics/handle/types/interfaces.go @@ -108,8 +108,8 @@ type IndexUsage interface { // StatsHistory is used to manage historical stats. type StatsHistory interface { - // RecordHistoricalStatsMeta records stats meta of the specified version to stats_meta_history. - RecordHistoricalStatsMeta(tableID int64, version uint64, source string, enforce bool) + // RecordHistoricalStatsMeta records the historical stats meta in mysql.stats_meta_history with a single transaction. + RecordHistoricalStatsMeta(version uint64, source string, enforce bool, tableIDs ...int64) // CheckHistoricalStatsEnable check whether historical stats is enabled. CheckHistoricalStatsEnable() (enable bool, err error) diff --git a/pkg/statistics/handle/usage/session_stats_collect.go b/pkg/statistics/handle/usage/session_stats_collect.go index 18bab8f48723a..590a95f2884fa 100644 --- a/pkg/statistics/handle/usage/session_stats_collect.go +++ b/pkg/statistics/handle/usage/session_stats_collect.go @@ -97,12 +97,45 @@ func (s *statsUsageImpl) DumpStatsDeltaToKV(dumpAll bool) error { s.SessionTableDelta().Merge(deltaMap) }() +<<<<<<< HEAD return utilstats.CallWithSCtx(s.statsHandle.SPool(), func(sctx sessionctx.Context) error { is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) currentTime := time.Now() for id, item := range deltaMap { if !s.needDumpStatsDelta(is, dumpAll, id, item, currentTime) { continue +======= + // Sort table IDs to ensure a consistent dump order to reduce the chance of deadlock. + tableIDs := make([]int64, 0, len(deltaMap)) + for id := range deltaMap { + tableIDs = append(tableIDs, id) + } + slices.Sort(tableIDs) + + // Dump stats delta in batches. + for i := 0; i < len(tableIDs); i += dumpDeltaBatchSize { + end := i + dumpDeltaBatchSize + if end > len(tableIDs) { + end = len(tableIDs) + } + + batchTableIDs := tableIDs[i:end] + var ( + statsVersion uint64 + batchUpdates []*storage.DeltaUpdate + ) + batchStart := time.Now() + err := utilstats.CallWithSCtx(s.statsHandle.SPool(), func(sctx sessionctx.Context) error { + is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) + batchUpdates = make([]*storage.DeltaUpdate, 0, len(batchTableIDs)) + // Collect all updates in the batch. + for _, id := range batchTableIDs { + item := deltaMap[id] + if !s.needDumpStatsDelta(is, dumpAll, id, item, batchStart) { + continue + } + batchUpdates = append(batchUpdates, storage.NewDeltaUpdate(id, item, false)) +>>>>>>> a1beeb1200e (statistics: update RecordHistoricalStatsMeta to handle multiple table IDs (#59037)) } updated, err := s.dumpTableStatCountToKV(is, id, item) if err != nil { @@ -150,6 +183,7 @@ func (s *statsUsageImpl) dumpTableStatCountToKV(is infoschema.InfoSchema, physic if err != nil { return errors.Trace(err) } +<<<<<<< HEAD tbl, _, _ := is.FindTableByPartitionID(physicalTableID) // Check if the table and its partitions are locked. @@ -161,6 +195,79 @@ func (s *statsUsageImpl) dumpTableStatCountToKV(is infoschema.InfoSchema, physic lockedTables, err := s.statsHandle.GetLockedTables(tidAndPid...) if err != nil { return err +======= + startRecordHistoricalStatsMeta := time.Now() + unlockedTableIDs := make([]int64, 0, len(batchUpdates)) + for _, update := range batchUpdates { + if !update.IsLocked { + failpoint.Inject("panic-when-record-historical-stats-meta", func() { + panic("panic when record historical stats meta") + }) + unlockedTableIDs = append(unlockedTableIDs, update.TableID) + } + } + s.statsHandle.RecordHistoricalStatsMeta(statsVersion, "flush stats", false, unlockedTableIDs...) + // Log a warning if recording historical stats meta takes too long, as it can be slow for large table counts + if time.Since(startRecordHistoricalStatsMeta) > time.Minute*15 { + statslogutil.SingletonStatsSamplerLogger().Warn("Recording historical stats meta is too slow", + zap.Int("tableCount", len(batchUpdates)), + zap.Duration("duration", time.Since(startRecordHistoricalStatsMeta))) + } + } + + return nil +} + +// dumpStatsDeltaToKV processes and writes multiple table stats count deltas to KV storage in batches. +// Note: The `batchUpdates` parameter may be modified during the execution of this function. +// +// 1. Handles partitioned tables: +// - For partitioned tables, the function ensures that the global statistics are updated appropriately +// in addition to the individual partition statistics. +// +// 2. Stashes lock information: +// - Records lock information for each table or partition. +func (s *statsUsageImpl) dumpStatsDeltaToKV( + is infoschema.InfoSchema, + sctx sessionctx.Context, + updates []*storage.DeltaUpdate, +) (statsVersion uint64, updated []*storage.DeltaUpdate, err error) { + if len(updates) == 0 { + return 0, nil, nil + } + beforeLen := len(updates) + statsVersion, err = utilstats.GetStartTS(sctx) + if err != nil { + return 0, nil, errors.Trace(err) + } + + // Collect all table IDs that need lock checking. + allTableIDs := make([]int64, 0, len(updates)) + for _, update := range updates { + // No need to update if the delta is zero. + if update.Delta.Count == 0 { + continue + } + // Add psychical table ID. + allTableIDs = append(allTableIDs, update.TableID) + // Add parent table ID if it's a partition table. + if tbl, _, _ := is.FindTableByPartitionID(update.TableID); tbl != nil { + allTableIDs = append(allTableIDs, tbl.Meta().ID) + } + } + + // Batch get lock status for all tables. + lockedTables, err := s.statsHandle.GetLockedTables(allTableIDs...) + if err != nil { + return 0, nil, errors.Trace(err) + } + + // Prepare batch updates + for _, update := range updates { + // No need to update if the delta is zero. + if update.Delta.Count == 0 { + continue +>>>>>>> a1beeb1200e (statistics: update RecordHistoricalStatsMeta to handle multiple table IDs (#59037)) } var affectedRows uint64