Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: update RecordHistoricalStatsMeta to handle multiple table IDs (#59037) #59120

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions pkg/executor/historical_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,3 +403,39 @@ PARTITION p0 VALUES LESS THAN (6)
require.NotNil(t, jt)
require.False(t, jt.IsHistoricalStats)
}

func TestDumpHistoricalStatsMetaForMultiTables(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_historical_stats = 1")
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t1(a int, b varchar(10), index idx(a, b))")
tk.MustExec("create table t2(a int, b varchar(10), index idx(a, b))")
// Insert some data.
tk.MustExec("insert into t1 values (1, 'a'), (2, 'b'), (3, 'c')")
tk.MustExec("insert into t2 values (1, 'a'), (2, 'b'), (3, 'c')")
// Analyze the tables.
tk.MustExec("analyze table t1")
tk.MustExec("analyze table t2")
h := dom.StatsHandle()
// Update the stats cache.
require.NoError(t, h.Update(context.Background(), dom.InfoSchema()))

// Insert more data.
tk.MustExec("insert into t1 values (4, 'd'), (5, 'e'), (6, 'f')")
tk.MustExec("insert into t2 values (4, 'd'), (5, 'e'), (6, 'f')")
// Dump stats delta to kv.
require.NoError(t, h.DumpStatsDeltaToKV(true))

// Check historical stats meta.
tbl1, err := dom.InfoSchema().TableByName(context.Background(), ast.NewCIStr("test"), ast.NewCIStr("t1"))
require.NoError(t, err)
tbl2, err := dom.InfoSchema().TableByName(context.Background(), ast.NewCIStr("test"), ast.NewCIStr("t2"))
require.NoError(t, err)
rows := tk.MustQuery("select version from mysql.stats_meta_history where table_id = ? order by version desc limit 1", tbl1.Meta().ID).Rows()
version1 := rows[0][0].(string)
rows = tk.MustQuery("select version from mysql.stats_meta_history where table_id = ? order by version desc limit 1", tbl2.Meta().ID).Rows()
version2 := rows[0][0].(string)
require.Equal(t, version1, version2)
}
2 changes: 1 addition & 1 deletion pkg/statistics/handle/ddl/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,9 +304,9 @@ func (h subscriber) recordHistoricalStatsMeta(
return history.RecordHistoricalStatsMeta(
ctx,
sctx,
id,
startTS,
util.StatsMetaHistorySourceSchemaChange,
id,
)
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/statistics/handle/history/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,16 @@ go_library(
"//pkg/meta/model",
"//pkg/sessionctx",
"//pkg/statistics/handle/cache",
"//pkg/statistics/handle/logutil",
"//pkg/statistics/handle/storage",
"//pkg/statistics/handle/types",
"//pkg/statistics/handle/util",
<<<<<<< HEAD
"//pkg/util/logutil",
=======
"//pkg/statistics/util",
"//pkg/util/intest",
>>>>>>> a1beeb1200e (statistics: update RecordHistoricalStatsMeta to handle multiple table IDs (#59037))
"@com_github_pingcap_errors//:errors",
"@org_uber_go_zap//:zap",
],
Expand Down
146 changes: 127 additions & 19 deletions pkg/statistics/handle/history/history_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,27 @@ package history

import (
"context"
"fmt"
"slices"
"strconv"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/statistics/handle/cache"
statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil"
"github.com/pingcap/tidb/pkg/statistics/handle/storage"
"github.com/pingcap/tidb/pkg/statistics/handle/types"
<<<<<<< HEAD
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/util/logutil"
=======
handleutil "github.com/pingcap/tidb/pkg/statistics/handle/util"
statsutil "github.com/pingcap/tidb/pkg/statistics/util"
"github.com/pingcap/tidb/pkg/util/intest"
>>>>>>> a1beeb1200e (statistics: update RecordHistoricalStatsMeta to handle multiple table IDs (#59037))
"go.uber.org/zap"
)

Expand Down Expand Up @@ -55,7 +66,7 @@ func (sh *statsHistoryImpl) RecordHistoricalStatsToStorage(dbName string, tableI
return 0, errors.Trace(err)
}
if js == nil {
logutil.BgLogger().Warn("no stats data to record", zap.String("dbName", dbName), zap.String("tableName", tableInfo.Name.O))
statslogutil.StatsLogger().Warn("no stats data to record", zap.String("dbName", dbName), zap.String("tableName", tableInfo.Name.O))
return 0, nil
}
var version uint64
Expand All @@ -66,20 +77,26 @@ func (sh *statsHistoryImpl) RecordHistoricalStatsToStorage(dbName string, tableI
return version, err
}

// RecordHistoricalStatsMeta records stats meta of the specified version to stats_meta_history table.
func (sh *statsHistoryImpl) RecordHistoricalStatsMeta(tableID int64, version uint64, source string, enforce bool) {
// RecordHistoricalStatsMeta records the historical stats meta in mysql.stats_meta_history with a single transaction.
func (sh *statsHistoryImpl) RecordHistoricalStatsMeta(version uint64, source string, enforce bool, tableIDs ...int64) {
if version == 0 {
return
}
if !enforce {
tbl, ok := sh.statsHandle.Get(tableID)
if !ok {
return
}
if !tbl.IsInitialized() {
return

var targetedTableIDs []int64
if enforce {
targetedTableIDs = tableIDs
} else {
targetedTableIDs = make([]int64, 0, len(tableIDs))
for _, tableID := range tableIDs {
tbl, ok := sh.statsHandle.Get(tableID)
if tableID == 0 || !ok || !tbl.IsInitialized() {
continue
}
targetedTableIDs = append(targetedTableIDs, tableID)
}
}
<<<<<<< HEAD
err := util.CallWithSCtx(sh.statsHandle.SPool(), func(sctx sessionctx.Context) error {
if !sctx.GetSessionVars().EnableHistoricalStats {
return nil
Expand All @@ -89,8 +106,24 @@ func (sh *statsHistoryImpl) RecordHistoricalStatsMeta(tableID int64, version uin
if err != nil { // just log the error, hide the error from the outside caller.
logutil.BgLogger().Error("record historical stats meta failed",
zap.Int64("table-id", tableID),
=======
// Sort the tableIDs to avoid deadlocks.
slices.Sort(targetedTableIDs)

err := handleutil.CallWithSCtx(sh.statsHandle.SPool(), func(sctx sessionctx.Context) error {
if !sctx.GetSessionVars().EnableHistoricalStats {
return nil
}
return RecordHistoricalStatsMeta(handleutil.StatsCtx, sctx, version, source, targetedTableIDs...)
}, handleutil.FlagWrapTxn)

if err != nil {
statslogutil.StatsLogger().Error("record historical stats meta failed",
>>>>>>> a1beeb1200e (statistics: update RecordHistoricalStatsMeta to handle multiple table IDs (#59037))
zap.Uint64("version", version),
zap.String("source", source),
zap.Int64s("tableIDs", tableIDs),
zap.Int64s("targetedTableIDs", targetedTableIDs),
zap.Error(err))
}
}
Expand All @@ -104,32 +137,102 @@ func (sh *statsHistoryImpl) CheckHistoricalStatsEnable() (enable bool, err error
return
}

// RecordHistoricalStatsMeta records the historical stats meta.
// RecordHistoricalStatsMeta records the historical stats meta for multiple tables.
func RecordHistoricalStatsMeta(
ctx context.Context,
sctx sessionctx.Context,
tableID int64,
version uint64,
source string,
tableIDs ...int64,
) error {
if tableID == 0 || version == 0 {
return errors.Errorf("tableID %d, version %d are invalid", tableID, version)
intest.Assert(version != 0, "version should not be zero")
intest.AssertFunc(func() bool {
for _, id := range tableIDs {
if id == 0 {
return false
}
}
return true
}, "tableIDs should not contain 0")
intest.AssertFunc(func() bool {
return slices.IsSorted(tableIDs)
}, "tableIDs should be sorted")
if len(tableIDs) == 0 {
return nil
}
<<<<<<< HEAD
rows, _, err := util.ExecRowsWithCtx(
ctx,
sctx,
"select modify_count, count from mysql.stats_meta where table_id = %? and version = %?",
tableID,
version,
)
if err != nil {
=======

// Enable prepared statement cache to avoid repeated compilation of the same statement.
if _, err := handleutil.ExecWithCtx(ctx, sctx, "SET tidb_enable_prepared_plan_cache = ON"); err != nil {
return errors.Trace(err)
}
if len(rows) == 0 {
return errors.New("no historical meta stats can be recorded")
defer func() {
_, err := handleutil.ExecWithCtx(ctx, sctx, "SET tidb_enable_prepared_plan_cache = OFF")
if err != nil {
statslogutil.StatsLogger().Error("failed to disable prepared statement cache", zap.Error(errors.Trace(err)))
}
}()
prepareSelectForUpdate := `
PREPARE select_stmt FROM 'SELECT * FROM mysql.stats_meta WHERE table_id = ? AND version = ? FOR UPDATE'
`
if _, err := handleutil.ExecWithCtx(ctx, sctx, prepareSelectForUpdate); err != nil {
return errors.Trace(err)
}
modifyCount, count := rows[0].GetInt64(0), rows[0].GetInt64(1)
defer func() {
_, err := handleutil.ExecWithCtx(ctx, sctx, "DEALLOCATE PREPARE select_stmt")
if err != nil {
statslogutil.StatsLogger().Error("failed to deallocate prepared statement", zap.Error(errors.Trace(err)))
}
}()

// Lock the rows one by one to avoid deadlocks.
for _, tableID := range tableIDs {
_, err := handleutil.ExecWithCtx(ctx, sctx, fmt.Sprintf("SET @table_id = %d, @version = %d", tableID, version))
if err != nil {
return errors.Trace(err)
}
rows, _, err := handleutil.ExecRowsWithCtx(ctx, sctx, "EXECUTE select_stmt USING @table_id, @version")
if err != nil {
return errors.Trace(err)
}
intest.Assert(len(rows) != 0, "no historical meta stats can be recorded")
if len(rows) == 0 {
statslogutil.StatsLogger().Warn("no historical meta stats can be recorded",
zap.Int64("tableID", tableID),
zap.Uint64("version", version),
)
}
}

// Convert tableIDs to string for SQL IN clause
tableIDStrs := make([]string, 0, len(tableIDs))
for _, id := range tableIDs {
tableIDStrs = append(tableIDStrs, strconv.FormatInt(id, 10))
}
tableIDsStr := strings.Join(tableIDStrs, ",")

// Single query that combines SELECT and INSERT
sql := fmt.Sprintf(`REPLACE INTO mysql.stats_meta_history(table_id, modify_count, count, version, source, create_time)
SELECT table_id, modify_count, count, %d, '%s', NOW()
FROM mysql.stats_meta
WHERE table_id IN (%s) AND version = %d`,
version, source, tableIDsStr, version)

_, err := handleutil.ExecWithCtx(ctx, sctx, sql)
>>>>>>> a1beeb1200e (statistics: update RecordHistoricalStatsMeta to handle multiple table IDs (#59037))
if err != nil {
return errors.Trace(err)
}

<<<<<<< HEAD
const sql = "REPLACE INTO mysql.stats_meta_history(table_id, modify_count, count, version, source, create_time) VALUES (%?, %?, %?, %?, %?, NOW())"
if _, err := util.ExecWithCtx(
ctx,
Expand All @@ -142,8 +245,13 @@ func RecordHistoricalStatsMeta(
source,
); err != nil {
return errors.Trace(err)
=======
// Invalidate cache for all tables
for _, tableID := range tableIDs {
cache.TableRowStatsCache.Invalidate(tableID)
>>>>>>> a1beeb1200e (statistics: update RecordHistoricalStatsMeta to handle multiple table IDs (#59037))
}
cache.TableRowStatsCache.Invalidate(tableID)

return nil
}

Expand Down
18 changes: 9 additions & 9 deletions pkg/statistics/handle/storage/stats_read_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (s *statsReadWriter) InsertColStats2KV(physicalID int64, colInfos []*model.
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
s.statsHandler.RecordHistoricalStatsMeta(physicalID, statsVer, util.StatsMetaHistorySourceSchemaChange, false)
s.statsHandler.RecordHistoricalStatsMeta(statsVer, util.StatsMetaHistorySourceSchemaChange, false, physicalID)
}
}()

Expand All @@ -76,7 +76,7 @@ func (s *statsReadWriter) InsertTableStats2KV(info *model.TableInfo, physicalID
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
s.statsHandler.RecordHistoricalStatsMeta(physicalID, statsVer, util.StatsMetaHistorySourceSchemaChange, false)
s.statsHandler.RecordHistoricalStatsMeta(statsVer, util.StatsMetaHistorySourceSchemaChange, false, physicalID)
}
}()

Expand All @@ -103,7 +103,7 @@ func (s *statsReadWriter) UpdateStatsMetaVersionForGC(physicalID int64) (err err
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
s.statsHandler.RecordHistoricalStatsMeta(physicalID, statsVer, util.StatsMetaHistorySourceSchemaChange, false)
s.statsHandler.RecordHistoricalStatsMeta(statsVer, util.StatsMetaHistorySourceSchemaChange, false, physicalID)
}
}()

Expand Down Expand Up @@ -134,7 +134,7 @@ func (s *statsReadWriter) SaveTableStatsToStorage(results *statistics.AnalyzeRes
}, util.FlagWrapTxn)
if err == nil && statsVer != 0 {
tableID := results.TableID.GetStatisticsID()
s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, source, true)
s.statsHandler.RecordHistoricalStatsMeta(statsVer, source, true, tableID)
}
return err
}
Expand Down Expand Up @@ -185,7 +185,7 @@ func (s *statsReadWriter) SaveStatsToStorage(
return err
}, util.FlagWrapTxn)
if err == nil && statsVer != 0 {
s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, source, false)
s.statsHandler.RecordHistoricalStatsMeta(statsVer, source, false, tableID)
}
return
}
Expand All @@ -198,7 +198,7 @@ func (s *statsReadWriter) SaveMetaToStorage(tableID, count, modifyCount int64, s
return err
}, util.FlagWrapTxn)
if err == nil && statsVer != 0 {
s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, source, false)
s.statsHandler.RecordHistoricalStatsMeta(statsVer, source, false, tableID)
}
return
}
Expand All @@ -211,7 +211,7 @@ func (s *statsReadWriter) InsertExtendedStats(statsName string, colIDs []int64,
return err
}, util.FlagWrapTxn)
if err == nil && statsVer != 0 {
s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, "extended stats", false)
s.statsHandler.RecordHistoricalStatsMeta(statsVer, "extended stats", false, tableID)
}
return
}
Expand All @@ -224,7 +224,7 @@ func (s *statsReadWriter) MarkExtendedStatsDeleted(statsName string, tableID int
return err
}, util.FlagWrapTxn)
if err == nil && statsVer != 0 {
s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, "extended stats", false)
s.statsHandler.RecordHistoricalStatsMeta(statsVer, "extended stats", false, tableID)
}
return
}
Expand All @@ -237,7 +237,7 @@ func (s *statsReadWriter) SaveExtendedStatsToStorage(tableID int64, extStats *st
return err
}, util.FlagWrapTxn)
if err == nil && statsVer != 0 {
s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, "extended stats", false)
s.statsHandler.RecordHistoricalStatsMeta(statsVer, "extended stats", false, tableID)
}
return
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/statistics/handle/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ type IndexUsage interface {

// StatsHistory is used to manage historical stats.
type StatsHistory interface {
// RecordHistoricalStatsMeta records stats meta of the specified version to stats_meta_history.
RecordHistoricalStatsMeta(tableID int64, version uint64, source string, enforce bool)
// RecordHistoricalStatsMeta records the historical stats meta in mysql.stats_meta_history with a single transaction.
RecordHistoricalStatsMeta(version uint64, source string, enforce bool, tableIDs ...int64)

// CheckHistoricalStatsEnable check whether historical stats is enabled.
CheckHistoricalStatsEnable() (enable bool, err error)
Expand Down
Loading