diff --git a/pkg/statistics/handle/ddl/subscriber.go b/pkg/statistics/handle/ddl/subscriber.go index a4af15a1815d0f..15aea9298531ce 100644 --- a/pkg/statistics/handle/ddl/subscriber.go +++ b/pkg/statistics/handle/ddl/subscriber.go @@ -413,9 +413,11 @@ func updateGlobalTableStats4DropPartition( ctx, sctx, startTS, - variable.TableDelta{Count: count, Delta: delta}, - globalTableInfo.ID, - isLocked, + &storage.DeltaUpdate{ + TableID: globalTableInfo.ID, + Delta: variable.TableDelta{Count: count, Delta: delta}, + IsLocked: isLocked, + }, )) } @@ -597,9 +599,11 @@ func updateGlobalTableStats4TruncatePartition( ctx, sctx, startTS, - variable.TableDelta{Count: count, Delta: delta}, - globalTableInfo.ID, - isLocked, + &storage.DeltaUpdate{ + TableID: globalTableInfo.ID, + Delta: variable.TableDelta{Count: count, Delta: delta}, + IsLocked: isLocked, + }, ) if err != nil { fields := truncatePartitionsLogFields( diff --git a/pkg/statistics/handle/storage/update.go b/pkg/statistics/handle/storage/update.go index 008388ed76a67b..44e15e1bcad35b 100644 --- a/pkg/statistics/handle/storage/update.go +++ b/pkg/statistics/handle/storage/update.go @@ -56,55 +56,109 @@ func UpdateStatsVersion(ctx context.Context, sctx sessionctx.Context) error { return nil } -// UpdateStatsMeta update the stats meta stat for this Table. +// DeltaUpdate is the update for stats meta. +type DeltaUpdate struct { + Delta variable.TableDelta + TableID int64 + IsLocked bool +} + +// UpdateStatsMeta update the stats meta stat for multiple Tables. func UpdateStatsMeta( ctx context.Context, sctx sessionctx.Context, startTS uint64, - delta variable.TableDelta, - id int64, - isLocked bool, + updates ...*DeltaUpdate, ) (err error) { - if isLocked { - // use INSERT INTO ... ON DUPLICATE KEY UPDATE here to fill missing stats_table_locked. - // Note: For locked tables, it is possible that the record gets deleted. So it can be negative. - _, err = statsutil.ExecWithCtx(ctx, sctx, "insert into mysql.stats_table_locked (version, table_id, modify_count, count) values (%?, %?, %?, %?) on duplicate key "+ - "update version = values(version), modify_count = modify_count + values(modify_count), count = count + values(count)", - startTS, id, delta.Count, delta.Delta) - } else { - if delta.Delta < 0 { - // use INSERT INTO ... ON DUPLICATE KEY UPDATE here to fill missing stats_meta. - _, err = statsutil.ExecWithCtx(ctx, sctx, "insert into mysql.stats_meta (version, table_id, modify_count, count) values (%?, %?, %?, 0) on duplicate key "+ - "update version = values(version), modify_count = modify_count + values(modify_count), count = if(count > %?, count - %?, 0)", - startTS, id, delta.Count, -delta.Delta, -delta.Delta) + if len(updates) == 0 { + return nil + } + + // Separate locked and unlocked updates + var lockedValues, unlockedPosValues, unlockedNegValues []string + var cacheInvalidateIDs []int64 + + for _, update := range updates { + if update.IsLocked { + lockedValues = append(lockedValues, fmt.Sprintf("(%d, %d, %d, %d)", + startTS, update.TableID, update.Delta.Count, update.Delta.Delta)) } else { - // use INSERT INTO ... ON DUPLICATE KEY UPDATE here to fill missing stats_meta. - _, err = statsutil.ExecWithCtx(ctx, sctx, "insert into mysql.stats_meta (version, table_id, modify_count, count) values (%?, %?, %?, %?) on duplicate key "+ - "update version = values(version), modify_count = modify_count + values(modify_count), count = count + values(count)", startTS, - id, delta.Count, delta.Delta) + if update.Delta.Delta < 0 { + unlockedNegValues = append(unlockedNegValues, fmt.Sprintf("(%d, %d, %d, %d)", + startTS, update.TableID, update.Delta.Count, -update.Delta.Delta)) + } else { + unlockedPosValues = append(unlockedPosValues, fmt.Sprintf("(%d, %d, %d, %d)", + startTS, update.TableID, update.Delta.Count, update.Delta.Delta)) + } + cacheInvalidateIDs = append(cacheInvalidateIDs, update.TableID) + } + } + + // Execute locked updates + if len(lockedValues) > 0 { + sql := fmt.Sprintf("insert into mysql.stats_table_locked (version, table_id, modify_count, count) values %s "+ + "on duplicate key update version = values(version), modify_count = modify_count + values(modify_count), "+ + "count = count + values(count)", strings.Join(lockedValues, ",")) + if _, err = statsutil.ExecWithCtx(ctx, sctx, sql); err != nil { + return err + } + } + + // Execute unlocked updates with positive delta + if len(unlockedPosValues) > 0 { + sql := fmt.Sprintf("insert into mysql.stats_meta (version, table_id, modify_count, count) values %s "+ + "on duplicate key update version = values(version), modify_count = modify_count + values(modify_count), "+ + "count = count + values(count)", strings.Join(unlockedPosValues, ",")) + if _, err = statsutil.ExecWithCtx(ctx, sctx, sql); err != nil { + return err } + } + + // Execute unlocked updates with negative delta + if len(unlockedNegValues) > 0 { + sql := fmt.Sprintf("insert into mysql.stats_meta (version, table_id, modify_count, count) values %s "+ + "on duplicate key update version = values(version), modify_count = modify_count + values(modify_count), "+ + "count = if(count > values(count), count - values(count), 0)", strings.Join(unlockedNegValues, ",")) + if _, err = statsutil.ExecWithCtx(ctx, sctx, sql); err != nil { + return err + } + } + + // Invalidate cache for all unlocked tables + for _, id := range cacheInvalidateIDs { cache.TableRowStatsCache.Invalidate(id) } - return err + + return nil } // DumpTableStatColSizeToKV dumps the column size stats to storage. -func DumpTableStatColSizeToKV(sctx sessionctx.Context, id int64, delta variable.TableDelta) error { - if len(delta.ColSize) == 0 { +func DumpTableStatColSizeToKV(sctx sessionctx.Context, updates ...*DeltaUpdate) error { + if len(updates) == 0 { return nil } - values := make([]string, 0, len(delta.ColSize)) - for histID, deltaColSize := range delta.ColSize { - if deltaColSize == 0 { + + values := make([]string, 0) + for _, update := range updates { + if len(update.Delta.ColSize) == 0 { continue } - values = append(values, fmt.Sprintf("(%d, 0, %d, 0, %d)", id, histID, deltaColSize)) + for histID, deltaColSize := range update.Delta.ColSize { + if deltaColSize == 0 { + continue + } + values = append(values, fmt.Sprintf("(%d, 0, %d, 0, %d)", + update.TableID, histID, deltaColSize)) + } } + if len(values) == 0 { return nil } + sql := fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, tot_col_size) "+ - "values %s on duplicate key update tot_col_size = GREATEST(0, tot_col_size + values(tot_col_size))", strings.Join(values, ",")) + "values %s on duplicate key update tot_col_size = GREATEST(0, tot_col_size + values(tot_col_size))", + strings.Join(values, ",")) _, _, err := statsutil.ExecRows(sctx, sql) return errors.Trace(err) } diff --git a/pkg/statistics/handle/usage/BUILD.bazel b/pkg/statistics/handle/usage/BUILD.bazel index 9a02fa38858d7f..19e5a428044dd3 100644 --- a/pkg/statistics/handle/usage/BUILD.bazel +++ b/pkg/statistics/handle/usage/BUILD.bazel @@ -23,9 +23,11 @@ go_library( "//pkg/types", "//pkg/util", "//pkg/util/intest", + "//pkg/util/logutil", "//pkg/util/sqlescape", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", + "@org_uber_go_zap//:zap", ], ) diff --git a/pkg/statistics/handle/usage/session_stats_collect.go b/pkg/statistics/handle/usage/session_stats_collect.go index 352a26dc1b9bc0..a5b43d137624cb 100644 --- a/pkg/statistics/handle/usage/session_stats_collect.go +++ b/pkg/statistics/handle/usage/session_stats_collect.go @@ -34,7 +34,9 @@ import ( "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/intest" + "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/sqlescape" + "go.uber.org/zap" ) var ( @@ -82,6 +84,8 @@ func (s *statsUsageImpl) needDumpStatsDelta(is infoschema.InfoSchema, dumpAll bo return false } +const batchSize = 5000 + // DumpStatsDeltaToKV sweeps the whole list and updates the global map, then we dumps every table that held in map to KV. // If the mode is `DumpDelta`, it will only dump that delta info that `Modify Count / Table Count` greater than a ratio. func (s *statsUsageImpl) DumpStatsDeltaToKV(dumpAll bool) error { @@ -96,93 +100,148 @@ func (s *statsUsageImpl) DumpStatsDeltaToKV(dumpAll bool) error { s.SessionTableDelta().Merge(deltaMap) }() - return utilstats.CallWithSCtx(s.statsHandle.SPool(), func(sctx sessionctx.Context) error { - is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) - currentTime := time.Now() - for id, item := range deltaMap { - if !s.needDumpStatsDelta(is, dumpAll, id, item, currentTime) { - continue + // Sort table IDs + tableIDs := make([]int64, 0, len(deltaMap)) + for id := range deltaMap { + tableIDs = append(tableIDs, id) + } + slices.Sort(tableIDs) + defer func() { + logutil.BgLogger().Info("dump stats delta to kv", zap.Int("table_count", len(tableIDs))) + }() + + currentTime := time.Now() + // Process in batches + for i := 0; i < len(tableIDs); i += batchSize { + end := i + batchSize + if end > len(tableIDs) { + end = len(tableIDs) + } + + batchTableIDs := tableIDs[i:end] + var ( + statsVersion uint64 + batchUpdates []*storage.DeltaUpdate + ) + // Wrap each batch in its own transaction + err := utilstats.CallWithSCtx(s.statsHandle.SPool(), func(sctx sessionctx.Context) error { + is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) + batchUpdates = make([]*storage.DeltaUpdate, 0, len(batchTableIDs)) + + // First pass: collect all valid updates + for _, id := range batchTableIDs { + item := deltaMap[id] + if !s.needDumpStatsDelta(is, dumpAll, id, item, currentTime) { + continue + } + batchUpdates = append(batchUpdates, &storage.DeltaUpdate{ + TableID: id, + Delta: item, + }) + } + + if len(batchUpdates) == 0 { + return nil } - updated, err := s.dumpTableStatCountToKV(is, id, item) + + // Process batch updates in its own transaction + startTS, err := s.dumpTableStatCountToKVBatch(is, sctx, batchUpdates) if err != nil { return errors.Trace(err) } - if updated { - UpdateTableDeltaMap(deltaMap, id, -item.Delta, -item.Count, nil) - } - if err = storage.DumpTableStatColSizeToKV(sctx, id, item); err != nil { - delete(deltaMap, id) + statsVersion = startTS + + // Batch dump column size stats + if err = storage.DumpTableStatColSizeToKV(sctx, batchUpdates...); err != nil { return errors.Trace(err) } - if updated { - delete(deltaMap, id) - } else { - m := deltaMap[id] - m.ColSize = nil - deltaMap[id] = m + + // Update deltaMap based on results + for _, update := range batchUpdates { + UpdateTableDeltaMap(deltaMap, update.TableID, -update.Delta.Delta, -update.Delta.Count, nil) + delete(deltaMap, update.TableID) + } + + return nil + }, utilstats.FlagWrapTxn) + if err != nil { + return errors.Trace(err) + } + + // Record historical stats meta for non-locked tables in a separate transaction. + for _, update := range batchUpdates { + if !update.IsLocked { + failpoint.Inject("panic-when-record-historical-stats-meta", func() { + panic("panic when record historical stats meta") + }) + s.statsHandle.RecordHistoricalStatsMeta(update.TableID, statsVersion, "flush stats", false) } } return nil - }) + } + + return nil } -// dumpTableStatDeltaToKV dumps a single delta with some table to KV and updates the version. -// For a partitioned table, we will update its global-stats as well. -func (s *statsUsageImpl) dumpTableStatCountToKV(is infoschema.InfoSchema, physicalTableID int64, delta variable.TableDelta) (updated bool, err error) { - statsVersion := uint64(0) - isLocked := false - defer func() { - // Only record the historical stats meta when the table is not locked because all stats meta are stored in the locked table. - if err == nil && statsVersion != 0 && !isLocked { - failpoint.Inject("panic-when-record-historical-stats-meta", func() { - panic("panic when record historical stats meta") - }) - s.statsHandle.RecordHistoricalStatsMeta(physicalTableID, statsVersion, "flush stats", false) - } - }() - if delta.Count == 0 { - return true, nil +// dumpTableStatCountToKVBatch dumps multiple table stats count delta to KV in batch. +// It returns a slice of booleans indicating whether each table was updated. +func (s *statsUsageImpl) dumpTableStatCountToKVBatch(is infoschema.InfoSchema, sctx sessionctx.Context, updates []*storage.DeltaUpdate) (uint64, error) { + if len(updates) == 0 { + return 0, nil } - err = utilstats.CallWithSCtx(s.statsHandle.SPool(), func(sctx sessionctx.Context) error { - statsVersion, err = utilstats.GetStartTS(sctx) - if err != nil { - return errors.Trace(err) - } + var statsVersion uint64 - tbl, _, _ := is.FindTableByPartitionID(physicalTableID) - // Check if the table and its partitions are locked. - tidAndPid := make([]int64, 0, 2) - if tbl != nil { - tidAndPid = append(tidAndPid, tbl.Meta().ID) + var err error + statsVersion, err = utilstats.GetStartTS(sctx) + if err != nil { + return 0, errors.Trace(err) + } + + // Collect all table IDs that need lock checking + allTableIDs := make([]int64, 0, len(updates)*2) + for _, update := range updates { + if update.Delta.Count == 0 { + continue } - tidAndPid = append(tidAndPid, physicalTableID) - lockedTables, err := s.statsHandle.GetLockedTables(tidAndPid...) - if err != nil { - return err + // Add partition ID + allTableIDs = append(allTableIDs, update.TableID) + // Add parent table ID if it's a partition + if tbl, _, _ := is.FindTableByPartitionID(update.TableID); tbl != nil { + allTableIDs = append(allTableIDs, tbl.Meta().ID) + } + } + + // Batch get lock status + lockedTables, err := s.statsHandle.GetLockedTables(allTableIDs...) + if err != nil { + return 0, errors.Trace(err) + } + + // Prepare batch updates + for _, update := range updates { + if update.Delta.Count == 0 { + continue } - var affectedRows uint64 - // If it's a partitioned table and its global-stats exists, - // update its count and modify_count as well. + tbl, _, _ := is.FindTableByPartitionID(update.TableID) if tbl != nil { - // We need to check if the table and the partition are locked. + // Partitioned table + tableID := tbl.Meta().ID isTableLocked := false isPartitionLocked := false - tableID := tbl.Meta().ID + if _, ok := lockedTables[tableID]; ok { isTableLocked = true } - if _, ok := lockedTables[physicalTableID]; ok { + if _, ok := lockedTables[update.TableID]; ok { isPartitionLocked = true } + tableOrPartitionLocked := isTableLocked || isPartitionLocked - isLocked = tableOrPartitionLocked - if err = storage.UpdateStatsMeta(utilstats.StatsCtx, sctx, statsVersion, delta, - physicalTableID, tableOrPartitionLocked); err != nil { - return err - } - affectedRows += sctx.GetSessionVars().StmtCtx.AffectedRows() + // Add partition update + update.IsLocked = tableOrPartitionLocked + // If the partition is locked, we don't need to update the global-stats. // We will update its global-stats when the partition is unlocked. // 1. If table is locked and partition is locked, we only stash the delta in the partition's lock info. @@ -195,31 +254,28 @@ func (s *statsUsageImpl) dumpTableStatCountToKV(is infoschema.InfoSchema, physic // 4. If table is not locked and partition is not locked, we update the global-stats. // To sum up, we only need to update the global-stats when the table and the partition are not locked. if !isTableLocked && !isPartitionLocked { - // If it's a partitioned table and its global-stats exists, update its count and modify_count as well. - if err = storage.UpdateStatsMeta(utilstats.StatsCtx, sctx, statsVersion, delta, tableID, isTableLocked); err != nil { - return err - } - affectedRows += sctx.GetSessionVars().StmtCtx.AffectedRows() + updates = append(updates, &storage.DeltaUpdate{ + TableID: tableID, + Delta: update.Delta, + IsLocked: isTableLocked, + }) } } else { - // This is a non-partitioned table. - // Check if it's locked. + // Non-partitioned table isTableLocked := false - if _, ok := lockedTables[physicalTableID]; ok { + if _, ok := lockedTables[update.TableID]; ok { isTableLocked = true } - isLocked = isTableLocked - if err = storage.UpdateStatsMeta(utilstats.StatsCtx, sctx, statsVersion, delta, - physicalTableID, isTableLocked); err != nil { - return err - } - affectedRows += sctx.GetSessionVars().StmtCtx.AffectedRows() + update.IsLocked = isTableLocked } + } - updated = affectedRows > 0 - return nil - }, utilstats.FlagWrapTxn) - return + // Batch update stats meta + if err = storage.UpdateStatsMeta(utilstats.StatsCtx, sctx, statsVersion, updates...); err != nil { + return 0, errors.Trace(err) + } + + return statsVersion, nil } // DumpColStatsUsageToKV sweeps the whole list, updates the column stats usage map and dumps it to KV.