Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: optimize stats delta dumping with batch processing #59150

2 changes: 0 additions & 2 deletions pkg/disttask/importinto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ type SharedVars struct {
TableImporter *importer.TableImporter
DataEngine *backend.OpenedEngine
IndexEngine *backend.OpenedEngine
Progress *importer.Progress

mu sync.Mutex
Checksum *verification.KVGroupChecksum
Expand Down Expand Up @@ -183,5 +182,4 @@ type Checksum struct {
// This portion of the code may be implemented uniformly in the framework in the future.
type Result struct {
LoadedRowCnt uint64
ColSizeMap map[int64]int64
}
8 changes: 1 addition & 7 deletions pkg/disttask/importinto/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,14 +577,9 @@ func updateResult(handle storage.TaskHandle, task *proto.Task, taskMeta *TaskMet
}
subtaskMetas = append(subtaskMetas, &subtaskMeta)
}
columnSizeMap := make(map[int64]int64)
for _, subtaskMeta := range subtaskMetas {
taskMeta.Result.LoadedRowCnt += subtaskMeta.Result.LoadedRowCnt
for key, val := range subtaskMeta.Result.ColSizeMap {
columnSizeMap[key] += val
}
}
taskMeta.Result.ColSizeMap = columnSizeMap

if globalSort {
taskMeta.Result.LoadedRowCnt, err = getLoadedRowCountOnGlobalSort(handle, task)
Expand Down Expand Up @@ -662,8 +657,7 @@ func (sch *ImportSchedulerExt) finishJob(ctx context.Context, logger *zap.Logger
func(ctx context.Context) (bool, error) {
return true, taskHandle.WithNewSession(func(se sessionctx.Context) error {
if err := importer.FlushTableStats(ctx, se, taskMeta.Plan.TableInfo.ID, &importer.JobImportResult{
Affected: taskMeta.Result.LoadedRowCnt,
ColSizeMap: taskMeta.Result.ColSizeMap,
Affected: taskMeta.Result.LoadedRowCnt,
}); err != nil {
logger.Warn("flush table stats failed", zap.Error(err))
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/disttask/importinto/subtask_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ func (e *importMinimalTaskExecutor) Run(ctx context.Context, dataWriter, indexWr
sharedVars.TableImporter,
sharedVars.DataEngine,
sharedVars.IndexEngine,
sharedVars.Progress,
logger,
checksum,
); err != nil {
Expand All @@ -82,7 +81,6 @@ func (e *importMinimalTaskExecutor) Run(ctx context.Context, dataWriter, indexWr
sharedVars.TableImporter,
dataWriter,
indexWriter,
sharedVars.Progress,
logger,
checksum,
); err != nil {
Expand Down
2 changes: 0 additions & 2 deletions pkg/disttask/importinto/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ func (s *importStepExecutor) RunSubtask(ctx context.Context, subtask *proto.Subt
TableImporter: s.tableImporter,
DataEngine: dataEngine,
IndexEngine: indexEngine,
Progress: importer.NewProgress(),
Checksum: verification.NewKVGroupChecksumWithKeyspace(s.tableImporter.GetKeySpace()),
SortedDataMeta: &external.SortedKVMeta{},
SortedIndexMetas: make(map[int64]*external.SortedKVMeta),
Expand Down Expand Up @@ -251,7 +250,6 @@ func (s *importStepExecutor) OnFinished(ctx context.Context, subtask *proto.Subt
}
subtaskMeta.Result = Result{
LoadedRowCnt: dataKVCount,
ColSizeMap: sharedVars.Progress.GetColSize(),
}
allocators := sharedVars.TableImporter.Allocators()
subtaskMeta.MaxIDs = map[autoid.AllocatorType]int64{
Expand Down
28 changes: 28 additions & 0 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2370,6 +2370,7 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err
return nil
}
do.SetStatsUpdating(true)
do.wg.Run(do.asyncLoadHistogram, "asyncLoadHistogram")
// The stats updated worker doesn't require the stats initialization to be completed.
// This is because the updated worker's primary responsibilities are to update the change delta and handle DDL operations.
// These tasks do not interfere with or depend on the initialization process.
Expand Down Expand Up @@ -2522,6 +2523,33 @@ func (do *Domain) loadStatsWorker() {
if err != nil {
logutil.BgLogger().Warn("update stats info failed", zap.Error(err))
}
case <-do.exit:
return
}
}
}

func (do *Domain) asyncLoadHistogram() {
defer util.Recover(metrics.LabelDomain, "asyncLoadStats", nil, false)
lease := do.statsLease
if lease == 0 {
lease = 3 * time.Second
}
cleanupTicker := time.NewTicker(lease)
defer func() {
cleanupTicker.Stop()
logutil.BgLogger().Info("asyncLoadStats exited.")
}()
select {
case <-do.StatsHandle().InitStatsDone:
case <-do.exit: // It may happen that before initStatsDone, tidb receive Ctrl+C
return
}
statsHandle := do.StatsHandle()
var err error
for {
select {
case <-cleanupTicker.C:
err = statsHandle.LoadNeededHistograms(do.InfoSchema())
if err != nil {
logutil.BgLogger().Warn("load histograms failed", zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func (e *DeleteExec) removeRow(ctx sessionctx.Context, t table.Table, h kv.Handl
return err
}

err = t.RemoveRecord(ctx.GetTableCtx(), txn, h, data, posInfo.ExtraPartialRowOption)
err = t.RemoveRecord(ctx.GetTableCtx(), txn, h, data, posInfo.IndexesRowLayout)
if err != nil {
return err
}
Expand Down
36 changes: 36 additions & 0 deletions pkg/executor/historical_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,3 +403,39 @@ PARTITION p0 VALUES LESS THAN (6)
require.NotNil(t, jt)
require.False(t, jt.IsHistoricalStats)
}

func TestDumpHistoricalStatsMetaForMultiTables(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_historical_stats = 1")
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t1(a int, b varchar(10), index idx(a, b))")
tk.MustExec("create table t2(a int, b varchar(10), index idx(a, b))")
// Insert some data.
tk.MustExec("insert into t1 values (1, 'a'), (2, 'b'), (3, 'c')")
tk.MustExec("insert into t2 values (1, 'a'), (2, 'b'), (3, 'c')")
// Analyze the tables.
tk.MustExec("analyze table t1")
tk.MustExec("analyze table t2")
h := dom.StatsHandle()
// Update the stats cache.
require.NoError(t, h.Update(context.Background(), dom.InfoSchema()))

// Insert more data.
tk.MustExec("insert into t1 values (4, 'd'), (5, 'e'), (6, 'f')")
tk.MustExec("insert into t2 values (4, 'd'), (5, 'e'), (6, 'f')")
// Dump stats delta to kv.
require.NoError(t, h.DumpStatsDeltaToKV(true))

// Check historical stats meta.
tbl1, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t1"))
require.NoError(t, err)
tbl2, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t2"))
require.NoError(t, err)
rows := tk.MustQuery("select version from mysql.stats_meta_history where table_id = ? order by version desc limit 1", tbl1.Meta().ID).Rows()
version1 := rows[0][0].(string)
rows = tk.MustQuery("select version from mysql.stats_meta_history where table_id = ? order by version desc limit 1", tbl2.Meta().ID).Rows()
version2 := rows[0][0].(string)
require.Equal(t, version1, version2)
}
1 change: 0 additions & 1 deletion pkg/executor/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ go_library(
"job.go",
"kv_encode.go",
"precheck.go",
"progress.go",
"table_import.go",
],
importpath = "github.com/pingcap/tidb/pkg/executor/importer",
Expand Down
5 changes: 1 addition & 4 deletions pkg/executor/importer/engine_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ func ProcessChunk(
chunk *checkpoints.ChunkCheckpoint,
tableImporter *TableImporter,
dataEngine, indexEngine *backend.OpenedEngine,
progress *Progress,
logger *zap.Logger,
groupChecksum *verification.KVGroupChecksum,
) error {
Expand Down Expand Up @@ -65,7 +64,7 @@ func ProcessChunk(
}
}()

return ProcessChunkWithWriter(ctx, chunk, tableImporter, dataWriter, indexWriter, progress, logger, groupChecksum)
return ProcessChunkWithWriter(ctx, chunk, tableImporter, dataWriter, indexWriter, logger, groupChecksum)
}

// ProcessChunkWithWriter processes a chunk, and write kv pairs to dataWriter and indexWriter.
Expand All @@ -74,7 +73,6 @@ func ProcessChunkWithWriter(
chunk *checkpoints.ChunkCheckpoint,
tableImporter *TableImporter,
dataWriter, indexWriter backend.EngineWriter,
progress *Progress,
logger *zap.Logger,
groupChecksum *verification.KVGroupChecksum,
) error {
Expand Down Expand Up @@ -116,6 +114,5 @@ func ProcessChunkWithWriter(
if err != nil {
return err
}
progress.AddColSize(encoder.GetColumnSize())
return nil
}
5 changes: 2 additions & 3 deletions pkg/executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -1395,9 +1395,8 @@ func getDataSourceType(p *plannercore.ImportInto) DataSourceType {

// JobImportResult is the result of the job import.
type JobImportResult struct {
Affected uint64
Warnings []contextutil.SQLWarn
ColSizeMap variable.DeltaColsMap
Affected uint64
Warnings []contextutil.SQLWarn
}

// GetMsgFromBRError get msg from BR error.
Expand Down
8 changes: 2 additions & 6 deletions pkg/executor/importer/importer_testkit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,11 +314,9 @@ func TestProcessChunkWith(t *testing.T) {
defer ti.Backend().CloseEngineMgr()
kvWriter := mock.NewMockEngineWriter(ctrl)
kvWriter.EXPECT().AppendRows(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
progress := importer.NewProgress()
checksum := verify.NewKVGroupChecksumWithKeyspace(keyspace)
err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, progress, zap.NewExample(), checksum)
err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, zap.NewExample(), checksum)
require.NoError(t, err)
require.Len(t, progress.GetColSize(), 3)
checksumMap := checksum.GetInnerChecksums()
require.Len(t, checksumMap, 1)
require.Equal(t, verify.MakeKVChecksum(74, 2, 15625182175392723123), *checksumMap[verify.DataKVGroupID])
Expand Down Expand Up @@ -346,11 +344,9 @@ func TestProcessChunkWith(t *testing.T) {
ti.SetSelectedRowCh(rowsCh)
kvWriter := mock.NewMockEngineWriter(ctrl)
kvWriter.EXPECT().AppendRows(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
progress := importer.NewProgress()
checksum := verify.NewKVGroupChecksumWithKeyspace(keyspace)
err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, progress, zap.NewExample(), checksum)
err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, zap.NewExample(), checksum)
require.NoError(t, err)
require.Len(t, progress.GetColSize(), 3)
checksumMap := checksum.GetInnerChecksums()
require.Len(t, checksumMap, 1)
require.Equal(t, verify.MakeKVChecksum(111, 3, 14585065391351463171), *checksumMap[verify.DataKVGroupID])
Expand Down
6 changes: 0 additions & 6 deletions pkg/executor/importer/kv_encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ import (
// KVEncoder encodes a row of data into a KV pair.
type KVEncoder interface {
Encode(row []types.Datum, rowID int64) (*kv.Pairs, error)
// GetColumnSize returns the size of each column in the current encoder.
GetColumnSize() map[int64]int64
io.Closer
}

Expand Down Expand Up @@ -91,10 +89,6 @@ func (en *tableKVEncoder) Encode(row []types.Datum, rowID int64) (*kv.Pairs, err
return en.Record2KV(record, row, rowID)
}

func (en *tableKVEncoder) GetColumnSize() map[int64]int64 {
return en.SessionCtx.GetColumnSize(en.TableMeta().ID)
}

// todo merge with code in load_data.go
func (en *tableKVEncoder) parserData2TableData(parserData []types.Datum, rowID int64) ([]types.Datum, error) {
row := make([]types.Datum, 0, len(en.insertColumns))
Expand Down
49 changes: 0 additions & 49 deletions pkg/executor/importer/progress.go

This file was deleted.

16 changes: 5 additions & 11 deletions pkg/executor/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,25 +654,20 @@ func (ti *TableImporter) ImportSelectedRows(ctx context.Context, se sessionctx.C
}

var (
mu sync.Mutex
checksum = verify.NewKVGroupChecksumWithKeyspace(ti.keyspace)
colSizeMap = make(map[int64]int64)
mu sync.Mutex
checksum = verify.NewKVGroupChecksumWithKeyspace(ti.keyspace)
)
eg, egCtx := tidbutil.NewErrorGroupWithRecoverWithCtx(ctx)
for i := 0; i < ti.ThreadCnt; i++ {
eg.Go(func() error {
chunkCheckpoint := checkpoints.ChunkCheckpoint{}
chunkChecksum := verify.NewKVGroupChecksumWithKeyspace(ti.keyspace)
progress := NewProgress()
defer func() {
mu.Lock()
defer mu.Unlock()
checksum.Add(chunkChecksum)
for k, v := range progress.GetColSize() {
colSizeMap[k] += v
}
}()
return ProcessChunk(egCtx, &chunkCheckpoint, ti, dataEngine, indexEngine, progress, ti.logger, chunkChecksum)
return ProcessChunk(egCtx, &chunkCheckpoint, ti, dataEngine, indexEngine, ti.logger, chunkChecksum)
})
}
if err = eg.Wait(); err != nil {
Expand Down Expand Up @@ -716,8 +711,7 @@ func (ti *TableImporter) ImportSelectedRows(ctx context.Context, se sessionctx.C
}

return &JobImportResult{
Affected: uint64(dataKVCount),
ColSizeMap: colSizeMap,
Affected: uint64(dataKVCount),
}, nil
}

Expand Down Expand Up @@ -976,7 +970,7 @@ func FlushTableStats(ctx context.Context, se sessionctx.Context, tableID int64,
sessionVars := se.GetSessionVars()
sessionVars.TxnCtxMu.Lock()
defer sessionVars.TxnCtxMu.Unlock()
sessionVars.TxnCtx.UpdateDeltaForTable(tableID, int64(result.Affected), int64(result.Affected), result.ColSizeMap)
sessionVars.TxnCtx.UpdateDeltaForTable(tableID, int64(result.Affected), int64(result.Affected))
se.StmtCommit(ctx)
return se.CommitTxn(ctx)
}
Loading