Skip to content

Commit

Permalink
POC: Batch stats delta dumping
Browse files Browse the repository at this point in the history
  • Loading branch information
winoros committed Dec 23, 2024
1 parent 3e42487 commit 70caa9e
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 113 deletions.
16 changes: 10 additions & 6 deletions pkg/statistics/handle/ddl/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,9 +413,11 @@ func updateGlobalTableStats4DropPartition(
ctx,
sctx,
startTS,
variable.TableDelta{Count: count, Delta: delta},
globalTableInfo.ID,
isLocked,
&storage.DeltaUpdate{
TableID: globalTableInfo.ID,
Delta: variable.TableDelta{Count: count, Delta: delta},
IsLocked: isLocked,
},
))
}

Expand Down Expand Up @@ -597,9 +599,11 @@ func updateGlobalTableStats4TruncatePartition(
ctx,
sctx,
startTS,
variable.TableDelta{Count: count, Delta: delta},
globalTableInfo.ID,
isLocked,
&storage.DeltaUpdate{
TableID: globalTableInfo.ID,
Delta: variable.TableDelta{Count: count, Delta: delta},
IsLocked: isLocked,
},
)
if err != nil {
fields := truncatePartitionsLogFields(
Expand Down
110 changes: 82 additions & 28 deletions pkg/statistics/handle/storage/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,55 +56,109 @@ func UpdateStatsVersion(ctx context.Context, sctx sessionctx.Context) error {
return nil
}

// UpdateStatsMeta update the stats meta stat for this Table.
// DeltaUpdate is the update for stats meta.
type DeltaUpdate struct {
Delta variable.TableDelta
TableID int64
IsLocked bool
}

// UpdateStatsMeta update the stats meta stat for multiple Tables.
func UpdateStatsMeta(
ctx context.Context,
sctx sessionctx.Context,
startTS uint64,
delta variable.TableDelta,
id int64,
isLocked bool,
updates ...*DeltaUpdate,
) (err error) {
if isLocked {
// use INSERT INTO ... ON DUPLICATE KEY UPDATE here to fill missing stats_table_locked.
// Note: For locked tables, it is possible that the record gets deleted. So it can be negative.
_, err = statsutil.ExecWithCtx(ctx, sctx, "insert into mysql.stats_table_locked (version, table_id, modify_count, count) values (%?, %?, %?, %?) on duplicate key "+
"update version = values(version), modify_count = modify_count + values(modify_count), count = count + values(count)",
startTS, id, delta.Count, delta.Delta)
} else {
if delta.Delta < 0 {
// use INSERT INTO ... ON DUPLICATE KEY UPDATE here to fill missing stats_meta.
_, err = statsutil.ExecWithCtx(ctx, sctx, "insert into mysql.stats_meta (version, table_id, modify_count, count) values (%?, %?, %?, 0) on duplicate key "+
"update version = values(version), modify_count = modify_count + values(modify_count), count = if(count > %?, count - %?, 0)",
startTS, id, delta.Count, -delta.Delta, -delta.Delta)
if len(updates) == 0 {
return nil
}

// Separate locked and unlocked updates
var lockedValues, unlockedPosValues, unlockedNegValues []string
var cacheInvalidateIDs []int64

for _, update := range updates {
if update.IsLocked {
lockedValues = append(lockedValues, fmt.Sprintf("(%d, %d, %d, %d)",
startTS, update.TableID, update.Delta.Count, update.Delta.Delta))
} else {
// use INSERT INTO ... ON DUPLICATE KEY UPDATE here to fill missing stats_meta.
_, err = statsutil.ExecWithCtx(ctx, sctx, "insert into mysql.stats_meta (version, table_id, modify_count, count) values (%?, %?, %?, %?) on duplicate key "+
"update version = values(version), modify_count = modify_count + values(modify_count), count = count + values(count)", startTS,
id, delta.Count, delta.Delta)
if update.Delta.Delta < 0 {
unlockedNegValues = append(unlockedNegValues, fmt.Sprintf("(%d, %d, %d, %d)",
startTS, update.TableID, update.Delta.Count, -update.Delta.Delta))
} else {
unlockedPosValues = append(unlockedPosValues, fmt.Sprintf("(%d, %d, %d, %d)",
startTS, update.TableID, update.Delta.Count, update.Delta.Delta))
}
cacheInvalidateIDs = append(cacheInvalidateIDs, update.TableID)
}
}

// Execute locked updates
if len(lockedValues) > 0 {
sql := fmt.Sprintf("insert into mysql.stats_table_locked (version, table_id, modify_count, count) values %s "+
"on duplicate key update version = values(version), modify_count = modify_count + values(modify_count), "+
"count = count + values(count)", strings.Join(lockedValues, ","))
if _, err = statsutil.ExecWithCtx(ctx, sctx, sql); err != nil {
return err
}
}

// Execute unlocked updates with positive delta
if len(unlockedPosValues) > 0 {
sql := fmt.Sprintf("insert into mysql.stats_meta (version, table_id, modify_count, count) values %s "+
"on duplicate key update version = values(version), modify_count = modify_count + values(modify_count), "+
"count = count + values(count)", strings.Join(unlockedPosValues, ","))
if _, err = statsutil.ExecWithCtx(ctx, sctx, sql); err != nil {
return err
}
}

// Execute unlocked updates with negative delta
if len(unlockedNegValues) > 0 {
sql := fmt.Sprintf("insert into mysql.stats_meta (version, table_id, modify_count, count) values %s "+
"on duplicate key update version = values(version), modify_count = modify_count + values(modify_count), "+
"count = if(count > values(count), count - values(count), 0)", strings.Join(unlockedNegValues, ","))
if _, err = statsutil.ExecWithCtx(ctx, sctx, sql); err != nil {
return err
}
}

// Invalidate cache for all unlocked tables
for _, id := range cacheInvalidateIDs {
cache.TableRowStatsCache.Invalidate(id)
}
return err

return nil
}

// DumpTableStatColSizeToKV dumps the column size stats to storage.
func DumpTableStatColSizeToKV(sctx sessionctx.Context, id int64, delta variable.TableDelta) error {
if len(delta.ColSize) == 0 {
func DumpTableStatColSizeToKV(sctx sessionctx.Context, updates ...*DeltaUpdate) error {
if len(updates) == 0 {
return nil
}
values := make([]string, 0, len(delta.ColSize))
for histID, deltaColSize := range delta.ColSize {
if deltaColSize == 0 {

values := make([]string, 0)
for _, update := range updates {
if len(update.Delta.ColSize) == 0 {
continue
}
values = append(values, fmt.Sprintf("(%d, 0, %d, 0, %d)", id, histID, deltaColSize))
for histID, deltaColSize := range update.Delta.ColSize {
if deltaColSize == 0 {
continue
}
values = append(values, fmt.Sprintf("(%d, 0, %d, 0, %d)",
update.TableID, histID, deltaColSize))
}
}

if len(values) == 0 {
return nil
}

sql := fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, tot_col_size) "+
"values %s on duplicate key update tot_col_size = GREATEST(0, tot_col_size + values(tot_col_size))", strings.Join(values, ","))
"values %s on duplicate key update tot_col_size = GREATEST(0, tot_col_size + values(tot_col_size))",
strings.Join(values, ","))
_, _, err := statsutil.ExecRows(sctx, sql)
return errors.Trace(err)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/statistics/handle/usage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ go_library(
"//pkg/types",
"//pkg/util",
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/sqlescape",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@org_uber_go_zap//:zap",
],
)

Expand Down
Loading

0 comments on commit 70caa9e

Please sign in to comment.