Skip to content

Commit

Permalink
statistics: refactor RecordHistoricalStatsMeta to handle single table (
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 committed Jan 23, 2025
1 parent b2e5d7e commit 6117d70
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 90 deletions.
142 changes: 53 additions & 89 deletions pkg/statistics/handle/history/history_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ package history

import (
"context"
"fmt"
"slices"
"strconv"
"strings"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -71,7 +67,7 @@ func (sh *statsHistoryImpl) RecordHistoricalStatsToStorage(dbName string, tableI
return version, err
}

// RecordHistoricalStatsMeta records the historical stats meta in mysql.stats_meta_history with a single transaction.
// RecordHistoricalStatsMeta records the historical stats meta in mysql.stats_meta_history one by one.
func (sh *statsHistoryImpl) RecordHistoricalStatsMeta(version uint64, source string, enforce bool, tableIDs ...int64) {
if version == 0 {
return
Expand All @@ -90,23 +86,36 @@ func (sh *statsHistoryImpl) RecordHistoricalStatsMeta(version uint64, source str
targetedTableIDs = append(targetedTableIDs, tableID)
}
}
// Sort the tableIDs to avoid deadlocks.
slices.Sort(targetedTableIDs)

shouldSkipHistoricalStats := false
err := util.CallWithSCtx(sh.statsHandle.SPool(), func(sctx sessionctx.Context) error {
if !sctx.GetSessionVars().EnableHistoricalStats {
shouldSkipHistoricalStats = true
return nil
}
return RecordHistoricalStatsMeta(util.StatsCtx, sctx, version, source, targetedTableIDs...)
return nil
}, util.FlagWrapTxn)

if err != nil {
statslogutil.StatsLogger().Error("record historical stats meta failed",
statslogutil.StatsLogger().Error("failed to check historical stats enable status",
zap.Uint64("version", version),
zap.String("source", source),
zap.Int64s("tableIDs", tableIDs),
zap.Int64s("targetedTableIDs", targetedTableIDs),
zap.Error(err))
return
}
if !shouldSkipHistoricalStats {
for _, tableID := range targetedTableIDs {
err := util.CallWithSCtx(sh.statsHandle.SPool(), func(sctx sessionctx.Context) error {
return RecordHistoricalStatsMeta(util.StatsCtx, sctx, version, source, tableID)
}, util.FlagWrapTxn)
if err != nil {
statslogutil.StatsLogger().Error("record historical stats meta failed",
zap.Uint64("version", version),
zap.String("source", source),
zap.Int64("tableID", tableID),
zap.Error(err))
}
}
}
}

Expand All @@ -119,95 +128,50 @@ func (sh *statsHistoryImpl) CheckHistoricalStatsEnable() (enable bool, err error
return
}

// RecordHistoricalStatsMeta records the historical stats meta for multiple tables.
// RecordHistoricalStatsMeta records the historical stats meta in mysql.stats_meta_history with the given version and source.
func RecordHistoricalStatsMeta(
ctx context.Context,
sctx sessionctx.Context,
version uint64,
source string,
tableIDs ...int64,
tableID int64,
) error {
intest.Assert(version != 0, "version should not be zero")
intest.AssertFunc(func() bool {
for _, id := range tableIDs {
if id == 0 {
return false
}
}
return true
}, "tableIDs should not contain 0")
intest.AssertFunc(func() bool {
return slices.IsSorted(tableIDs)
}, "tableIDs should be sorted")
if len(tableIDs) == 0 {
return nil
}

// Enable prepared statement cache to avoid repeated compilation of the same statement.
if _, err := util.ExecWithCtx(ctx, sctx, "SET tidb_enable_prepared_plan_cache = ON"); err != nil {
return errors.Trace(err)
}
defer func() {
_, err := util.ExecWithCtx(ctx, sctx, "SET tidb_enable_prepared_plan_cache = OFF")
if err != nil {
statslogutil.StatsLogger().Error("failed to disable prepared statement cache", zap.Error(errors.Trace(err)))
}
}()
prepareSelectForUpdate := `
PREPARE select_stmt FROM 'SELECT * FROM mysql.stats_meta WHERE table_id = ? AND version = ? FOR UPDATE'
`
if _, err := util.ExecWithCtx(ctx, sctx, prepareSelectForUpdate); err != nil {
return errors.Trace(err)
}
defer func() {
_, err := util.ExecWithCtx(ctx, sctx, "DEALLOCATE PREPARE select_stmt")
if err != nil {
statslogutil.StatsLogger().Error("failed to deallocate prepared statement", zap.Error(errors.Trace(err)))
}
}()

// Lock the rows one by one to avoid deadlocks.
for _, tableID := range tableIDs {
_, err := util.ExecWithCtx(ctx, sctx, fmt.Sprintf("SET @table_id = %d, @version = %d", tableID, version))
if err != nil {
return errors.Trace(err)
}
rows, _, err := util.ExecRowsWithCtx(ctx, sctx, "EXECUTE select_stmt USING @table_id, @version")
if err != nil {
return errors.Trace(err)
}
intest.Assert(len(rows) != 0, "no historical meta stats can be recorded")
if len(rows) == 0 {
statslogutil.StatsLogger().Warn("no historical meta stats can be recorded",
zap.Int64("tableID", tableID),
zap.Uint64("version", version),
)
}
}

// Convert tableIDs to string for SQL IN clause
tableIDStrs := make([]string, 0, len(tableIDs))
for _, id := range tableIDs {
tableIDStrs = append(tableIDStrs, strconv.FormatInt(id, 10))
}
tableIDsStr := strings.Join(tableIDStrs, ",")

// Single query that combines SELECT and INSERT
sql := fmt.Sprintf(`REPLACE INTO mysql.stats_meta_history(table_id, modify_count, count, version, source, create_time)
SELECT table_id, modify_count, count, %d, '%s', NOW()
FROM mysql.stats_meta
WHERE table_id IN (%s) AND version = %d`,
version, source, tableIDsStr, version)

_, err := util.ExecWithCtx(ctx, sctx, sql)
intest.Assert(tableID != 0, "tableID should not be zero")
if tableID == 0 || version == 0 {
return errors.Errorf("tableID %d, version %d are invalid", tableID, version)
}

rows, _, err := util.ExecRowsWithCtx(
ctx,
sctx,
"SELECT modify_count, count FROM mysql.stats_meta WHERE table_id = %? and version = %? FOR UPDATE",
tableID,
version,
)
if err != nil {
return errors.Trace(err)
}

// Invalidate cache for all tables
for _, tableID := range tableIDs {
cache.TableRowStatsCache.Invalidate(tableID)
intest.Assert(len(rows) != 0, "no historical meta stats can be recorded")
if len(rows) == 0 {
return errors.New("no historical meta stats can be recorded")
}

modifyCount, count := rows[0].GetInt64(0), rows[0].GetInt64(1)
const sql = "REPLACE INTO mysql.stats_meta_history(table_id, modify_count, count, version, source, create_time) VALUES (%?, %?, %?, %?, %?, NOW())"
if _, err := util.ExecWithCtx(
ctx,
sctx,
sql,
tableID,
modifyCount,
count,
version,
source,
); err != nil {
return errors.Trace(err)
}
cache.TableRowStatsCache.Invalidate(tableID)

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/statistics/handle/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ type IndexUsage interface {

// StatsHistory is used to manage historical stats.
type StatsHistory interface {
// RecordHistoricalStatsMeta records the historical stats meta in mysql.stats_meta_history with a single transaction.
// RecordHistoricalStatsMeta records the historical stats meta in mysql.stats_meta_history one by one.
RecordHistoricalStatsMeta(version uint64, source string, enforce bool, tableIDs ...int64)

// CheckHistoricalStatsEnable check whether historical stats is enabled.
Expand Down

0 comments on commit 6117d70

Please sign in to comment.