From b31316663ce2f01665d85993b65db1f2085f2d9b Mon Sep 17 00:00:00 2001 From: Rustin Date: Thu, 23 Jan 2025 18:06:05 +0800 Subject: [PATCH] statistics: optimize stats delta dumping with batch processing (#59150) ref pingcap/tidb#57869 --- pkg/disttask/importinto/proto.go | 2 - pkg/disttask/importinto/scheduler.go | 8 +- pkg/disttask/importinto/subtask_executor.go | 2 - pkg/disttask/importinto/task_executor.go | 2 - pkg/executor/delete.go | 2 +- pkg/executor/historical_stats_test.go | 36 +++ pkg/executor/importer/BUILD.bazel | 1 - pkg/executor/importer/engine_process.go | 5 +- pkg/executor/importer/import.go | 5 +- .../importer/importer_testkit_test.go | 8 +- pkg/executor/importer/kv_encode.go | 6 - pkg/executor/importer/progress.go | 49 ---- pkg/executor/importer/table_import.go | 16 +- pkg/executor/infoschema_reader_test.go | 39 ++- pkg/executor/internal/exec/executor.go | 2 +- pkg/executor/test/analyzetest/analyze_test.go | 35 ++- .../analyzetest/memorycontrol/BUILD.bazel | 1 + .../memorycontrol/memory_control_test.go | 4 +- pkg/lightning/backend/kv/context.go | 27 +- pkg/lightning/backend/kv/context_test.go | 13 +- pkg/lightning/backend/kv/session.go | 5 - pkg/planner/core/integration_test.go | 3 + pkg/planner/core/logical_plan_builder.go | 31 +- pkg/planner/core/logical_plans_test.go | 2 +- pkg/planner/indexadvisor/optimizer_test.go | 2 + pkg/server/handler/tests/BUILD.bazel | 1 + .../handler/tests/http_handler_serial_test.go | 4 +- pkg/session/session.go | 2 +- pkg/sessionctx/variable/session.go | 36 +-- pkg/sessionctx/variable/session_test.go | 51 ---- .../handle/autoanalyze/autoanalyze_test.go | 2 + .../priorityqueue/queue_ddl_handler_test.go | 6 +- .../autoanalyze/priorityqueue/queue_test.go | 18 +- .../handle/autoanalyze/refresher/BUILD.bazel | 1 + .../autoanalyze/refresher/refresher_test.go | 8 +- pkg/statistics/handle/ddl/subscriber.go | 10 +- .../handle/handletest/handle_test.go | 13 +- .../handle/handletest/statstest/BUILD.bazel | 2 +- .../handle/handletest/statstest/stats_test.go | 29 ++ pkg/statistics/handle/history/BUILD.bazel | 3 +- .../handle/history/history_stats.go | 67 +++-- .../handle/storage/stats_read_writer.go | 18 +- pkg/statistics/handle/storage/update.go | 134 ++++++--- pkg/statistics/handle/types/interfaces.go | 4 +- .../handle/updatetest/update_test.go | 10 +- pkg/statistics/handle/usage/BUILD.bazel | 2 + .../handle/usage/session_stats_collect.go | 275 ++++++++++++------ pkg/table/BUILD.bazel | 1 - pkg/table/table.go | 35 +-- pkg/table/tables/tables.go | 53 +--- pkg/table/tblctx/BUILD.bazel | 2 +- pkg/table/tblctx/buffers.go | 53 +--- pkg/table/tblctx/buffers_test.go | 32 -- pkg/table/tblctx/table.go | 2 +- pkg/table/tblsession/table.go | 4 +- pkg/table/tblsession/table_test.go | 3 +- 56 files changed, 549 insertions(+), 638 deletions(-) delete mode 100644 pkg/executor/importer/progress.go diff --git a/pkg/disttask/importinto/proto.go b/pkg/disttask/importinto/proto.go index 48378f44d8649..e8c9d3f9794e4 100644 --- a/pkg/disttask/importinto/proto.go +++ b/pkg/disttask/importinto/proto.go @@ -110,7 +110,6 @@ type SharedVars struct { TableImporter *importer.TableImporter DataEngine *backend.OpenedEngine IndexEngine *backend.OpenedEngine - Progress *importer.Progress mu sync.Mutex Checksum *verification.KVGroupChecksum @@ -183,5 +182,4 @@ type Checksum struct { // This portion of the code may be implemented uniformly in the framework in the future. type Result struct { LoadedRowCnt uint64 - ColSizeMap map[int64]int64 } diff --git a/pkg/disttask/importinto/scheduler.go b/pkg/disttask/importinto/scheduler.go index 1dd1c0021ae7f..2567806b8063c 100644 --- a/pkg/disttask/importinto/scheduler.go +++ b/pkg/disttask/importinto/scheduler.go @@ -577,14 +577,9 @@ func updateResult(handle storage.TaskHandle, task *proto.Task, taskMeta *TaskMet } subtaskMetas = append(subtaskMetas, &subtaskMeta) } - columnSizeMap := make(map[int64]int64) for _, subtaskMeta := range subtaskMetas { taskMeta.Result.LoadedRowCnt += subtaskMeta.Result.LoadedRowCnt - for key, val := range subtaskMeta.Result.ColSizeMap { - columnSizeMap[key] += val - } } - taskMeta.Result.ColSizeMap = columnSizeMap if globalSort { taskMeta.Result.LoadedRowCnt, err = getLoadedRowCountOnGlobalSort(handle, task) @@ -662,8 +657,7 @@ func (sch *ImportSchedulerExt) finishJob(ctx context.Context, logger *zap.Logger func(ctx context.Context) (bool, error) { return true, taskHandle.WithNewSession(func(se sessionctx.Context) error { if err := importer.FlushTableStats(ctx, se, taskMeta.Plan.TableInfo.ID, &importer.JobImportResult{ - Affected: taskMeta.Result.LoadedRowCnt, - ColSizeMap: taskMeta.Result.ColSizeMap, + Affected: taskMeta.Result.LoadedRowCnt, }); err != nil { logger.Warn("flush table stats failed", zap.Error(err)) } diff --git a/pkg/disttask/importinto/subtask_executor.go b/pkg/disttask/importinto/subtask_executor.go index 4978a2c09aaa9..3dcb7347ca589 100644 --- a/pkg/disttask/importinto/subtask_executor.go +++ b/pkg/disttask/importinto/subtask_executor.go @@ -69,7 +69,6 @@ func (e *importMinimalTaskExecutor) Run(ctx context.Context, dataWriter, indexWr sharedVars.TableImporter, sharedVars.DataEngine, sharedVars.IndexEngine, - sharedVars.Progress, logger, checksum, ); err != nil { @@ -82,7 +81,6 @@ func (e *importMinimalTaskExecutor) Run(ctx context.Context, dataWriter, indexWr sharedVars.TableImporter, dataWriter, indexWriter, - sharedVars.Progress, logger, checksum, ); err != nil { diff --git a/pkg/disttask/importinto/task_executor.go b/pkg/disttask/importinto/task_executor.go index 2c85f63c3785a..d118a99c42816 100644 --- a/pkg/disttask/importinto/task_executor.go +++ b/pkg/disttask/importinto/task_executor.go @@ -154,7 +154,6 @@ func (s *importStepExecutor) RunSubtask(ctx context.Context, subtask *proto.Subt TableImporter: s.tableImporter, DataEngine: dataEngine, IndexEngine: indexEngine, - Progress: importer.NewProgress(), Checksum: verification.NewKVGroupChecksumWithKeyspace(s.tableImporter.GetKeySpace()), SortedDataMeta: &external.SortedKVMeta{}, SortedIndexMetas: make(map[int64]*external.SortedKVMeta), @@ -251,7 +250,6 @@ func (s *importStepExecutor) OnFinished(ctx context.Context, subtask *proto.Subt } subtaskMeta.Result = Result{ LoadedRowCnt: dataKVCount, - ColSizeMap: sharedVars.Progress.GetColSize(), } allocators := sharedVars.TableImporter.Allocators() subtaskMeta.MaxIDs = map[autoid.AllocatorType]int64{ diff --git a/pkg/executor/delete.go b/pkg/executor/delete.go index ea6d6861702e5..04c6a8efa2c51 100644 --- a/pkg/executor/delete.go +++ b/pkg/executor/delete.go @@ -274,7 +274,7 @@ func (e *DeleteExec) removeRow(ctx sessionctx.Context, t table.Table, h kv.Handl return err } - err = t.RemoveRecord(ctx.GetTableCtx(), txn, h, data, posInfo.ExtraPartialRowOption) + err = t.RemoveRecord(ctx.GetTableCtx(), txn, h, data, posInfo.IndexesRowLayout) if err != nil { return err } diff --git a/pkg/executor/historical_stats_test.go b/pkg/executor/historical_stats_test.go index c448eb628c10f..f639095c444f7 100644 --- a/pkg/executor/historical_stats_test.go +++ b/pkg/executor/historical_stats_test.go @@ -403,3 +403,39 @@ PARTITION p0 VALUES LESS THAN (6) require.NotNil(t, jt) require.False(t, jt.IsHistoricalStats) } + +func TestDumpHistoricalStatsMetaForMultiTables(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_historical_stats = 1") + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t1(a int, b varchar(10), index idx(a, b))") + tk.MustExec("create table t2(a int, b varchar(10), index idx(a, b))") + // Insert some data. + tk.MustExec("insert into t1 values (1, 'a'), (2, 'b'), (3, 'c')") + tk.MustExec("insert into t2 values (1, 'a'), (2, 'b'), (3, 'c')") + // Analyze the tables. + tk.MustExec("analyze table t1") + tk.MustExec("analyze table t2") + h := dom.StatsHandle() + // Update the stats cache. + require.NoError(t, h.Update(context.Background(), dom.InfoSchema())) + + // Insert more data. + tk.MustExec("insert into t1 values (4, 'd'), (5, 'e'), (6, 'f')") + tk.MustExec("insert into t2 values (4, 'd'), (5, 'e'), (6, 'f')") + // Dump stats delta to kv. + require.NoError(t, h.DumpStatsDeltaToKV(true)) + + // Check historical stats meta. + tbl1, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t1")) + require.NoError(t, err) + tbl2, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t2")) + require.NoError(t, err) + rows := tk.MustQuery("select version from mysql.stats_meta_history where table_id = ? order by version desc limit 1", tbl1.Meta().ID).Rows() + version1 := rows[0][0].(string) + rows = tk.MustQuery("select version from mysql.stats_meta_history where table_id = ? order by version desc limit 1", tbl2.Meta().ID).Rows() + version2 := rows[0][0].(string) + require.Equal(t, version1, version2) +} diff --git a/pkg/executor/importer/BUILD.bazel b/pkg/executor/importer/BUILD.bazel index 45affc02d7974..7bb132dcb4883 100644 --- a/pkg/executor/importer/BUILD.bazel +++ b/pkg/executor/importer/BUILD.bazel @@ -9,7 +9,6 @@ go_library( "job.go", "kv_encode.go", "precheck.go", - "progress.go", "table_import.go", ], importpath = "github.com/pingcap/tidb/pkg/executor/importer", diff --git a/pkg/executor/importer/engine_process.go b/pkg/executor/importer/engine_process.go index 93e6ca802cfeb..8d00be701c367 100644 --- a/pkg/executor/importer/engine_process.go +++ b/pkg/executor/importer/engine_process.go @@ -30,7 +30,6 @@ func ProcessChunk( chunk *checkpoints.ChunkCheckpoint, tableImporter *TableImporter, dataEngine, indexEngine *backend.OpenedEngine, - progress *Progress, logger *zap.Logger, groupChecksum *verification.KVGroupChecksum, ) error { @@ -65,7 +64,7 @@ func ProcessChunk( } }() - return ProcessChunkWithWriter(ctx, chunk, tableImporter, dataWriter, indexWriter, progress, logger, groupChecksum) + return ProcessChunkWithWriter(ctx, chunk, tableImporter, dataWriter, indexWriter, logger, groupChecksum) } // ProcessChunkWithWriter processes a chunk, and write kv pairs to dataWriter and indexWriter. @@ -74,7 +73,6 @@ func ProcessChunkWithWriter( chunk *checkpoints.ChunkCheckpoint, tableImporter *TableImporter, dataWriter, indexWriter backend.EngineWriter, - progress *Progress, logger *zap.Logger, groupChecksum *verification.KVGroupChecksum, ) error { @@ -116,6 +114,5 @@ func ProcessChunkWithWriter( if err != nil { return err } - progress.AddColSize(encoder.GetColumnSize()) return nil } diff --git a/pkg/executor/importer/import.go b/pkg/executor/importer/import.go index 6ef74ba91f0a3..b36598715309b 100644 --- a/pkg/executor/importer/import.go +++ b/pkg/executor/importer/import.go @@ -1395,9 +1395,8 @@ func getDataSourceType(p *plannercore.ImportInto) DataSourceType { // JobImportResult is the result of the job import. type JobImportResult struct { - Affected uint64 - Warnings []contextutil.SQLWarn - ColSizeMap variable.DeltaColsMap + Affected uint64 + Warnings []contextutil.SQLWarn } // GetMsgFromBRError get msg from BR error. diff --git a/pkg/executor/importer/importer_testkit_test.go b/pkg/executor/importer/importer_testkit_test.go index 12c395d4d8ec9..f5f07b3667418 100644 --- a/pkg/executor/importer/importer_testkit_test.go +++ b/pkg/executor/importer/importer_testkit_test.go @@ -314,11 +314,9 @@ func TestProcessChunkWith(t *testing.T) { defer ti.Backend().CloseEngineMgr() kvWriter := mock.NewMockEngineWriter(ctrl) kvWriter.EXPECT().AppendRows(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - progress := importer.NewProgress() checksum := verify.NewKVGroupChecksumWithKeyspace(keyspace) - err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, progress, zap.NewExample(), checksum) + err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, zap.NewExample(), checksum) require.NoError(t, err) - require.Len(t, progress.GetColSize(), 3) checksumMap := checksum.GetInnerChecksums() require.Len(t, checksumMap, 1) require.Equal(t, verify.MakeKVChecksum(74, 2, 15625182175392723123), *checksumMap[verify.DataKVGroupID]) @@ -346,11 +344,9 @@ func TestProcessChunkWith(t *testing.T) { ti.SetSelectedRowCh(rowsCh) kvWriter := mock.NewMockEngineWriter(ctrl) kvWriter.EXPECT().AppendRows(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - progress := importer.NewProgress() checksum := verify.NewKVGroupChecksumWithKeyspace(keyspace) - err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, progress, zap.NewExample(), checksum) + err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, zap.NewExample(), checksum) require.NoError(t, err) - require.Len(t, progress.GetColSize(), 3) checksumMap := checksum.GetInnerChecksums() require.Len(t, checksumMap, 1) require.Equal(t, verify.MakeKVChecksum(111, 3, 14585065391351463171), *checksumMap[verify.DataKVGroupID]) diff --git a/pkg/executor/importer/kv_encode.go b/pkg/executor/importer/kv_encode.go index 88d43c33409f8..a1c908ad0d4bf 100644 --- a/pkg/executor/importer/kv_encode.go +++ b/pkg/executor/importer/kv_encode.go @@ -35,8 +35,6 @@ import ( // KVEncoder encodes a row of data into a KV pair. type KVEncoder interface { Encode(row []types.Datum, rowID int64) (*kv.Pairs, error) - // GetColumnSize returns the size of each column in the current encoder. - GetColumnSize() map[int64]int64 io.Closer } @@ -91,10 +89,6 @@ func (en *tableKVEncoder) Encode(row []types.Datum, rowID int64) (*kv.Pairs, err return en.Record2KV(record, row, rowID) } -func (en *tableKVEncoder) GetColumnSize() map[int64]int64 { - return en.SessionCtx.GetColumnSize(en.TableMeta().ID) -} - // todo merge with code in load_data.go func (en *tableKVEncoder) parserData2TableData(parserData []types.Datum, rowID int64) ([]types.Datum, error) { row := make([]types.Datum, 0, len(en.insertColumns)) diff --git a/pkg/executor/importer/progress.go b/pkg/executor/importer/progress.go deleted file mode 100644 index 8ee0e882ef925..0000000000000 --- a/pkg/executor/importer/progress.go +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package importer - -import ( - "maps" - "sync" -) - -// Progress is the progress of the IMPORT INTO task. -type Progress struct { - colSizeMu sync.Mutex - colSizeMap map[int64]int64 -} - -// NewProgress creates a new Progress. -func NewProgress() *Progress { - return &Progress{ - colSizeMap: make(map[int64]int64), - } -} - -// AddColSize adds the size of the column to the progress. -func (p *Progress) AddColSize(colSizeMap map[int64]int64) { - p.colSizeMu.Lock() - defer p.colSizeMu.Unlock() - for key, value := range colSizeMap { - p.colSizeMap[key] += value - } -} - -// GetColSize returns the size of the column. -func (p *Progress) GetColSize() map[int64]int64 { - p.colSizeMu.Lock() - defer p.colSizeMu.Unlock() - return maps.Clone(p.colSizeMap) -} diff --git a/pkg/executor/importer/table_import.go b/pkg/executor/importer/table_import.go index eecdee38d96db..4a5d0fc34ac25 100644 --- a/pkg/executor/importer/table_import.go +++ b/pkg/executor/importer/table_import.go @@ -654,25 +654,20 @@ func (ti *TableImporter) ImportSelectedRows(ctx context.Context, se sessionctx.C } var ( - mu sync.Mutex - checksum = verify.NewKVGroupChecksumWithKeyspace(ti.keyspace) - colSizeMap = make(map[int64]int64) + mu sync.Mutex + checksum = verify.NewKVGroupChecksumWithKeyspace(ti.keyspace) ) eg, egCtx := tidbutil.NewErrorGroupWithRecoverWithCtx(ctx) for i := 0; i < ti.ThreadCnt; i++ { eg.Go(func() error { chunkCheckpoint := checkpoints.ChunkCheckpoint{} chunkChecksum := verify.NewKVGroupChecksumWithKeyspace(ti.keyspace) - progress := NewProgress() defer func() { mu.Lock() defer mu.Unlock() checksum.Add(chunkChecksum) - for k, v := range progress.GetColSize() { - colSizeMap[k] += v - } }() - return ProcessChunk(egCtx, &chunkCheckpoint, ti, dataEngine, indexEngine, progress, ti.logger, chunkChecksum) + return ProcessChunk(egCtx, &chunkCheckpoint, ti, dataEngine, indexEngine, ti.logger, chunkChecksum) }) } if err = eg.Wait(); err != nil { @@ -716,8 +711,7 @@ func (ti *TableImporter) ImportSelectedRows(ctx context.Context, se sessionctx.C } return &JobImportResult{ - Affected: uint64(dataKVCount), - ColSizeMap: colSizeMap, + Affected: uint64(dataKVCount), }, nil } @@ -976,7 +970,7 @@ func FlushTableStats(ctx context.Context, se sessionctx.Context, tableID int64, sessionVars := se.GetSessionVars() sessionVars.TxnCtxMu.Lock() defer sessionVars.TxnCtxMu.Unlock() - sessionVars.TxnCtx.UpdateDeltaForTable(tableID, int64(result.Affected), int64(result.Affected), result.ColSizeMap) + sessionVars.TxnCtx.UpdateDeltaForTable(tableID, int64(result.Affected), int64(result.Affected)) se.StmtCommit(ctx) return se.CommitTxn(ctx) } diff --git a/pkg/executor/infoschema_reader_test.go b/pkg/executor/infoschema_reader_test.go index 2e0db16455914..704ab1a8a0eaa 100644 --- a/pkg/executor/infoschema_reader_test.go +++ b/pkg/executor/infoschema_reader_test.go @@ -182,20 +182,23 @@ func TestDataForTableStatsField(t *testing.T) { require.NoError(t, h.DumpStatsDeltaToKV(true)) require.NoError(t, h.Update(context.Background(), is)) tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check( - testkit.Rows("3 18 54 6")) + testkit.Rows("3 16 48 0")) tk.MustExec(`insert into t(c, d, e) values(4, 5, "f")`) require.NoError(t, h.DumpStatsDeltaToKV(true)) require.NoError(t, h.Update(context.Background(), is)) tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check( - testkit.Rows("4 18 72 8")) + testkit.Rows("4 16 64 0")) tk.MustExec("delete from t where c >= 3") require.NoError(t, h.DumpStatsDeltaToKV(true)) require.NoError(t, h.Update(context.Background(), is)) tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check( - testkit.Rows("2 18 36 4")) + testkit.Rows("2 16 32 0")) tk.MustExec("delete from t where c=3") require.NoError(t, h.DumpStatsDeltaToKV(true)) require.NoError(t, h.Update(context.Background(), is)) + tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check( + testkit.Rows("2 16 32 0")) + tk.MustExec("analyze table t all columns") tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check( testkit.Rows("2 18 36 4")) @@ -207,6 +210,9 @@ func TestDataForTableStatsField(t *testing.T) { tk.MustExec(`insert into t(a, b, c) values(1, 2, "c"), (7, 3, "d"), (12, 4, "e")`) require.NoError(t, h.DumpStatsDeltaToKV(true)) require.NoError(t, h.Update(context.Background(), is)) + tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check( + testkit.Rows("3 16 48 0")) + tk.MustExec("analyze table t all columns") tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check( testkit.Rows("3 18 54 6")) } @@ -227,23 +233,24 @@ func TestPartitionsTable(t *testing.T) { tk.MustExec(`insert into test_partitions(a, b, c) values(1, 2, "c"), (7, 3, "d"), (12, 4, "e");`) tk.MustQuery("select PARTITION_NAME, PARTITION_DESCRIPTION from information_schema.PARTITIONS where table_name='test_partitions';").Check( - testkit.Rows("" + - "p0 6]\n" + - "[p1 11]\n" + - "[p2 16")) + testkit.Rows("p0 6", "p1 11", "p2 16")) tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.PARTITIONS where table_name='test_partitions';").Check( - testkit.Rows("" + - "0 0 0 0]\n" + - "[0 0 0 0]\n" + - "[0 0 0 0")) + testkit.Rows( + "0 0 0 0", + "0 0 0 0", + "0 0 0 0", + ), + ) require.NoError(t, h.DumpStatsDeltaToKV(true)) require.NoError(t, h.Update(context.Background(), is)) tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.PARTITIONS where table_name='test_partitions';").Check( - testkit.Rows("" + - "1 18 18 2]\n" + - "[1 18 18 2]\n" + - "[1 18 18 2")) + testkit.Rows( + "1 16 16 0", + "1 16 16 0", + "1 16 16 0", + ), + ) }) // Test for table has no partitions. @@ -255,7 +262,7 @@ func TestPartitionsTable(t *testing.T) { require.NoError(t, h.DumpStatsDeltaToKV(true)) require.NoError(t, h.Update(context.Background(), is)) tk.MustQuery("select PARTITION_NAME, TABLE_ROWS, AVG_ROW_LENGTH, DATA_LENGTH, INDEX_LENGTH from information_schema.PARTITIONS where table_name='test_partitions_1';").Check( - testkit.Rows(" 3 18 54 6")) + testkit.Rows(" 3 16 48 0")) tk.MustExec("DROP TABLE IF EXISTS `test_partitions`;") tk.MustExec(`CREATE TABLE test_partitions1 (id int, b int, c varchar(5), primary key(id), index idx(c)) PARTITION BY RANGE COLUMNS(id) (PARTITION p0 VALUES LESS THAN (6), PARTITION p1 VALUES LESS THAN (11), PARTITION p2 VALUES LESS THAN (16));`) diff --git a/pkg/executor/internal/exec/executor.go b/pkg/executor/internal/exec/executor.go index 8dce75e47909b..82081ff8a4e75 100644 --- a/pkg/executor/internal/exec/executor.go +++ b/pkg/executor/internal/exec/executor.go @@ -378,7 +378,7 @@ func (e *BaseExecutor) Ctx() sessionctx.Context { // UpdateDeltaForTableID updates the delta info for the table with tableID. func (e *BaseExecutor) UpdateDeltaForTableID(id int64) { txnCtx := e.ctx.GetSessionVars().TxnCtx - txnCtx.UpdateDeltaForTable(id, 0, 0, nil) + txnCtx.UpdateDeltaForTable(id, 0, 0) } // GetSysSession gets a system session context from executor. diff --git a/pkg/executor/test/analyzetest/analyze_test.go b/pkg/executor/test/analyzetest/analyze_test.go index 669720ff30d48..14cc0793af312 100644 --- a/pkg/executor/test/analyzetest/analyze_test.go +++ b/pkg/executor/test/analyzetest/analyze_test.go @@ -1146,6 +1146,7 @@ func TestAnalyzeColumnsWithPrimaryKey(t *testing.T) { tk.MustExec("drop table if exists t") tk.MustExec("set @@tidb_analyze_version = 2") tk.MustExec("create table t (a int, b int, c int primary key)") + statstestutil.HandleNextDDLEventWithTxn(h) tk.MustExec("insert into t values (1,1,1), (1,1,2), (2,2,3), (2,2,4), (3,3,5), (4,3,6), (5,4,7), (6,4,8), (null,null,9)") require.NoError(t, h.DumpStatsDeltaToKV(true)) @@ -1189,7 +1190,7 @@ func TestAnalyzeColumnsWithPrimaryKey(t *testing.T) { "test t c 0 2 1")) tk.MustQuery(fmt.Sprintf("select is_index, hist_id, distinct_count, null_count, tot_col_size, stats_ver, truncate(correlation,2) from mysql.stats_histograms where table_id = %d", tblID)).Sort().Check( testkit.Rows("0 1 6 1 8 2 1", - "0 2 0 0 8 0 0", // column b is not analyzed + "0 2 0 0 0 0 0", // column b is not analyzed "0 3 9 0 9 2 1", )) tk.MustQuery("show stats_buckets where db_name = 'test' and table_name = 't'").Sort().Check( @@ -1213,6 +1214,7 @@ func TestAnalyzeColumnsWithIndex(t *testing.T) { tk.MustExec("drop table if exists t") tk.MustExec("set @@tidb_analyze_version = 2") tk.MustExec("create table t (a int, b int, c int, d int, index idx_b_d(b, d))") + statstestutil.HandleNextDDLEventWithTxn(h) tk.MustExec("insert into t values (1,1,null,1), (2,1,9,1), (1,1,8,1), (2,2,7,2), (1,3,7,3), (2,4,6,4), (1,4,6,5), (2,4,6,5), (1,5,6,5)") require.NoError(t, h.DumpStatsDeltaToKV(true)) @@ -1260,7 +1262,7 @@ func TestAnalyzeColumnsWithIndex(t *testing.T) { "test t idx_b_d 1 (1, 1) 3", "test t idx_b_d 1 (4, 5) 2")) tk.MustQuery(fmt.Sprintf("select is_index, hist_id, distinct_count, null_count, tot_col_size, stats_ver, truncate(correlation,2) from mysql.stats_histograms where table_id = %d", tblID)).Sort().Check( - testkit.Rows("0 1 0 0 9 0 0", // column a is not analyzed + testkit.Rows("0 1 0 0 0 0 0", // column a is not analyzed "0 2 5 0 9 2 1", "0 3 4 1 8 2 -0.07", "0 4 5 0 9 2 1", @@ -1289,6 +1291,7 @@ func TestAnalyzeColumnsWithClusteredIndex(t *testing.T) { tk.MustExec("drop table if exists t") tk.MustExec("set @@tidb_analyze_version = 2") tk.MustExec("create table t (a int, b int, c int, d int, primary key(b, d) clustered)") + statstestutil.HandleNextDDLEventWithTxn(h) tk.MustExec("insert into t values (1,1,null,1), (2,2,9,2), (1,3,8,3), (2,4,7,4), (1,5,7,5), (2,6,6,6), (1,7,6,7), (2,8,6,8), (1,9,6,9)") require.NoError(t, h.DumpStatsDeltaToKV(true)) @@ -1336,7 +1339,7 @@ func TestAnalyzeColumnsWithClusteredIndex(t *testing.T) { "test t d 0 1 1", "test t d 0 2 1")) tk.MustQuery(fmt.Sprintf("select is_index, hist_id, distinct_count, null_count, tot_col_size, stats_ver, truncate(correlation,2) from mysql.stats_histograms where table_id = %d", tblID)).Sort().Check( - testkit.Rows("0 1 0 0 9 0 0", // column a is not analyzed + testkit.Rows("0 1 0 0 0 0 0", // column a is not analyzed "0 2 9 0 9 2 1", "0 3 4 1 8 2 -0.07", "0 4 9 0 9 2 1", @@ -1366,6 +1369,7 @@ func TestAnalyzeColumnsWithDynamicPartitionTable(t *testing.T) { tk.MustExec("set @@tidb_analyze_version = 2") tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'") tk.MustExec("create table t (a int, b int, c int, index idx(c)) partition by range (a) (partition p0 values less than (10), partition p1 values less than maxvalue)") + statstestutil.HandleNextDDLEventWithTxn(h) tk.MustExec("insert into t values (1,2,1), (2,4,1), (3,6,1), (4,8,2), (4,8,2), (5,10,3), (5,10,4), (5,10,5), (null,null,6), (11,22,7), (12,24,8), (13,26,9), (14,28,10), (15,30,11), (16,32,12), (16,32,13), (16,32,13), (16,32,14), (17,34,14), (17,34,14)") require.NoError(t, h.DumpStatsDeltaToKV(true)) @@ -1462,15 +1466,16 @@ func TestAnalyzeColumnsWithDynamicPartitionTable(t *testing.T) { "test t p1 idx 1 1 6 1 11 12 0")) tk.MustQuery("select table_id, is_index, hist_id, distinct_count, null_count, tot_col_size, stats_ver, truncate(correlation,2) from mysql.stats_histograms order by table_id, is_index, hist_id asc").Check( - testkit.Rows(fmt.Sprintf("%d 0 1 12 1 19 2 0", tblID), // global, a + testkit.Rows(fmt.Sprintf("%d 0 1 12 1 19 2 0", tblID), // global, aA + fmt.Sprintf("%d 0 2 0 0 0 0 0", tblID), // global, b, not analyzed fmt.Sprintf("%d 0 3 14 0 20 2 0", tblID), // global, c fmt.Sprintf("%d 1 1 14 0 0 2 0", tblID), // global, idx fmt.Sprintf("%d 0 1 5 1 8 2 1", p0ID), // p0, a - fmt.Sprintf("%d 0 2 0 0 8 0 0", p0ID), // p0, b, not analyzed + fmt.Sprintf("%d 0 2 0 0 0 0 0", p0ID), // p0, b, not analyzed fmt.Sprintf("%d 0 3 6 0 9 2 1", p0ID), // p0, c fmt.Sprintf("%d 1 1 6 0 9 2 0", p0ID), // p0, idx fmt.Sprintf("%d 0 1 7 0 11 2 1", p1ID), // p1, a - fmt.Sprintf("%d 0 2 0 0 11 0 0", p1ID), // p1, b, not analyzed + fmt.Sprintf("%d 0 2 0 0 0 0 0", p1ID), // p1, b, not analyzed fmt.Sprintf("%d 0 3 8 0 11 2 1", p1ID), // p1, c fmt.Sprintf("%d 1 1 8 0 11 2 0", p1ID), // p1, idx )) @@ -1490,12 +1495,14 @@ func TestAnalyzeColumnsWithStaticPartitionTable(t *testing.T) { tk.MustExec("set @@tidb_analyze_version = 2") tk.MustExec("set @@tidb_partition_prune_mode = 'static'") tk.MustExec("create table t (a int, b int, c int, index idx(c)) partition by range (a) (partition p0 values less than (10), partition p1 values less than maxvalue)") + statstestutil.HandleNextDDLEventWithTxn(h) tk.MustExec("insert into t values (1,2,1), (2,4,1), (3,6,1), (4,8,2), (4,8,2), (5,10,3), (5,10,4), (5,10,5), (null,null,6), (11,22,7), (12,24,8), (13,26,9), (14,28,10), (15,30,11), (16,32,12), (16,32,13), (16,32,13), (16,32,14), (17,34,14), (17,34,14)") require.NoError(t, h.DumpStatsDeltaToKV(true)) is := dom.InfoSchema() tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) require.NoError(t, err) + tblID := tbl.Meta().ID defs := tbl.Meta().Partition.Definitions p0ID := defs[0].ID p1ID := defs[1].ID @@ -1570,12 +1577,16 @@ func TestAnalyzeColumnsWithStaticPartitionTable(t *testing.T) { "test t p1 idx 1 1 6 1 11 12 0")) tk.MustQuery("select table_id, is_index, hist_id, distinct_count, null_count, tot_col_size, stats_ver, truncate(correlation,2) from mysql.stats_histograms order by table_id, is_index, hist_id asc").Check( - testkit.Rows(fmt.Sprintf("%d 0 1 5 1 8 2 1", p0ID), // p0, a - fmt.Sprintf("%d 0 2 0 0 8 0 0", p0ID), // p0, b, not analyzed + testkit.Rows(fmt.Sprintf("%d 0 1 0 0 0 0 0", tblID), // global, a, not analyzed + fmt.Sprintf("%d 0 2 0 0 0 0 0", tblID), // global, b, not analyzed + fmt.Sprintf("%d 0 3 0 0 0 0 0", tblID), // global, c, not analyzed + fmt.Sprintf("%d 1 1 0 0 0 0 0", tblID), // global, idx, not analyzed + fmt.Sprintf("%d 0 1 5 1 8 2 1", p0ID), // p0, a + fmt.Sprintf("%d 0 2 0 0 0 0 0", p0ID), // p0, b, not analyzed fmt.Sprintf("%d 0 3 6 0 9 2 1", p0ID), // p0, c fmt.Sprintf("%d 1 1 6 0 9 2 0", p0ID), // p0, idx fmt.Sprintf("%d 0 1 7 0 11 2 1", p1ID), // p1, a - fmt.Sprintf("%d 0 2 0 0 11 0 0", p1ID), // p1, b, not analyzed + fmt.Sprintf("%d 0 2 0 0 0 0 0", p1ID), // p1, b, not analyzed fmt.Sprintf("%d 0 3 8 0 11 2 1", p1ID), // p1, c fmt.Sprintf("%d 1 1 8 0 11 2 0", p1ID), // p1, idx )) @@ -1595,6 +1606,7 @@ func TestAnalyzeColumnsWithExtendedStats(t *testing.T) { tk.MustExec("set @@tidb_analyze_version = 2") tk.MustExec("set @@tidb_enable_extended_stats = on") tk.MustExec("create table t (a int, b int, c int)") + statstestutil.HandleNextDDLEventWithTxn(h) tk.MustExec("alter table t add stats_extended s1 correlation(b,c)") tk.MustExec("insert into t values (5,1,1), (4,2,2), (3,3,3), (2,4,4), (1,5,5)") require.NoError(t, h.DumpStatsDeltaToKV(true)) @@ -1637,7 +1649,7 @@ func TestAnalyzeColumnsWithExtendedStats(t *testing.T) { "test t c 0 1 1", "test t c 0 2 1")) tk.MustQuery(fmt.Sprintf("select is_index, hist_id, distinct_count, null_count, tot_col_size, stats_ver, truncate(correlation,2) from mysql.stats_histograms where table_id = %d", tblID)).Sort().Check( - testkit.Rows("0 1 0 0 5 0 0", // column a is not analyzed + testkit.Rows("0 1 0 0 0 0 0", // column a is not analyzed "0 2 5 0 5 2 1", "0 3 5 0 5 2 1", )) @@ -1665,6 +1677,7 @@ func TestAnalyzeColumnsWithVirtualColumnIndex(t *testing.T) { tk.MustExec("drop table if exists t") tk.MustExec("set @@tidb_analyze_version = 2") tk.MustExec("create table t (a int, b int, c int as (b+1), index idx(c))") + statstestutil.HandleNextDDLEventWithTxn(h) tk.MustExec("insert into t (a,b) values (1,1), (2,2), (3,3), (4,4), (5,4), (6,5), (7,5), (8,5), (null,null)") require.NoError(t, h.DumpStatsDeltaToKV(true)) @@ -1781,7 +1794,7 @@ func TestAnalyzeColumnsAfterAnalyzeAll(t *testing.T) { "test t b 0 1 3", "test t b 0 2 2")) tk.MustQuery(fmt.Sprintf("select is_index, hist_id, distinct_count, null_count, tot_col_size, stats_ver, truncate(correlation,2) from mysql.stats_histograms where table_id = %d", tblID)).Sort().Check( - testkit.Rows("0 1 4 0 8 2 1", // tot_col_size of column a is updated to 8 by DumpStatsDeltaToKV + testkit.Rows("0 1 4 0 6 2 1", "0 2 5 0 8 2 0.76")) tk.MustQuery("show stats_buckets where db_name = 'test' and table_name = 't'").Sort().Check( // db, tbl, part, col, is_index, bucket_id, count, repeats, lower, upper, ndv diff --git a/pkg/executor/test/analyzetest/memorycontrol/BUILD.bazel b/pkg/executor/test/analyzetest/memorycontrol/BUILD.bazel index 75d48f9dd113b..83de820219af5 100644 --- a/pkg/executor/test/analyzetest/memorycontrol/BUILD.bazel +++ b/pkg/executor/test/analyzetest/memorycontrol/BUILD.bazel @@ -15,6 +15,7 @@ go_test( "//pkg/executor", "//pkg/sessionctx/variable", "//pkg/statistics", + "//pkg/statistics/handle/ddl/testutil", "//pkg/testkit", "//pkg/util", "@com_github_pingcap_failpoint//:failpoint", diff --git a/pkg/executor/test/analyzetest/memorycontrol/memory_control_test.go b/pkg/executor/test/analyzetest/memorycontrol/memory_control_test.go index 1982e8a974871..3edb4c520a8a9 100644 --- a/pkg/executor/test/analyzetest/memorycontrol/memory_control_test.go +++ b/pkg/executor/test/analyzetest/memorycontrol/memory_control_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/executor" "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/util" "github.com/stretchr/testify/require" @@ -134,6 +135,8 @@ func TestGlobalMemoryControlForAutoAnalyze(t *testing.T) { tk.MustExec("use test") tk.MustExec("create table t(a int)") + h := dom.StatsHandle() + testutil.HandleNextDDLEventWithTxn(h) tk.MustExec("insert into t select 1") for i := 1; i <= 8; i++ { tk.MustExec("insert into t select * from t") // 256 Lines @@ -143,7 +146,6 @@ func TestGlobalMemoryControlForAutoAnalyze(t *testing.T) { rs0 := tk.MustQuery("select fail_reason from mysql.analyze_jobs where table_name=? and state=? limit 1", "t", "failed") require.Len(t, rs0.Rows(), 0) - h := dom.StatsHandle() originalVal4 := statistics.AutoAnalyzeMinCnt originalVal5 := tk.MustQuery("select @@global.tidb_auto_analyze_ratio").Rows()[0][0].(string) statistics.AutoAnalyzeMinCnt = 0 diff --git a/pkg/lightning/backend/kv/context.go b/pkg/lightning/backend/kv/context.go index 14c1963723c21..6af19687eef53 100644 --- a/pkg/lightning/backend/kv/context.go +++ b/pkg/lightning/backend/kv/context.go @@ -15,9 +15,7 @@ package kv import ( - "maps" "math/rand" - "sync" "time" "github.com/pingcap/tidb/pkg/errctx" @@ -122,11 +120,6 @@ type litTableMutateContext struct { reservedRowIDAlloc stmtctx.ReservedRowIDAlloc enableMutationChecker bool assertionLevel variable.AssertionLevel - tableDelta struct { - sync.Mutex - // tblID -> (colID -> deltaSize) - m map[int64]map[int64]int64 - } } // AlternativeAllocators implements the `table.MutateContext` interface. @@ -188,25 +181,9 @@ func (ctx *litTableMutateContext) GetStatisticsSupport() (tblctx.StatisticsSuppo } // UpdatePhysicalTableDelta implements the `table.StatisticsSupport` interface. -func (ctx *litTableMutateContext) UpdatePhysicalTableDelta( - physicalTableID int64, _ int64, - _ int64, cols variable.DeltaCols, +func (*litTableMutateContext) UpdatePhysicalTableDelta( + _, _, _ int64, ) { - ctx.tableDelta.Lock() - defer ctx.tableDelta.Unlock() - if ctx.tableDelta.m == nil { - ctx.tableDelta.m = make(map[int64]map[int64]int64) - } - tableMap := ctx.tableDelta.m - colSize := tableMap[physicalTableID] - tableMap[physicalTableID] = cols.UpdateColSizeMap(colSize) -} - -// GetColumnSize returns the colum size map (colID -> deltaSize) for the given table ID. -func (ctx *litTableMutateContext) GetColumnSize(tblID int64) (ret map[int64]int64) { - ctx.tableDelta.Lock() - defer ctx.tableDelta.Unlock() - return maps.Clone(ctx.tableDelta.m[tblID]) } // GetCachedTableSupport implements the `table.MutateContext` interface. diff --git a/pkg/lightning/backend/kv/context_test.go b/pkg/lightning/backend/kv/context_test.go index bb04a4f3d5bbe..fd6d3b5d83406 100644 --- a/pkg/lightning/backend/kv/context_test.go +++ b/pkg/lightning/backend/kv/context_test.go @@ -250,17 +250,8 @@ func TestLitTableMutateContext(t *testing.T) { stats, ok := tblCtx.GetStatisticsSupport() require.True(t, ok) // test for `UpdatePhysicalTableDelta` and `GetColumnSize` - stats.UpdatePhysicalTableDelta(123, 5, 2, variable.DeltaColsMap{1: 2, 3: 4}) - r := tblCtx.GetColumnSize(123) - require.Equal(t, map[int64]int64{1: 2, 3: 4}, r) - stats.UpdatePhysicalTableDelta(123, 8, 2, variable.DeltaColsMap{3: 5, 4: 3}) - r = tblCtx.GetColumnSize(123) - require.Equal(t, map[int64]int64{1: 2, 3: 9, 4: 3}, r) - // the result should be a cloned value - r[1] = 100 - require.Equal(t, map[int64]int64{1: 2, 3: 9, 4: 3}, tblCtx.GetColumnSize(123)) - // test gets a non-existed table - require.Empty(t, tblCtx.GetColumnSize(456)) + stats.UpdatePhysicalTableDelta(123, 5, 2) + stats.UpdatePhysicalTableDelta(123, 8, 2) } // test for default diff --git a/pkg/lightning/backend/kv/session.go b/pkg/lightning/backend/kv/session.go index bda638da38609..b2cd3d540c7bf 100644 --- a/pkg/lightning/backend/kv/session.go +++ b/pkg/lightning/backend/kv/session.go @@ -363,11 +363,6 @@ func (s *Session) UnsetUserVar(varName string) { s.exprCtx.unsetUserVar(varName) } -// GetColumnSize returns the size of each column. -func (s *Session) GetColumnSize(tblID int64) (ret map[int64]int64) { - return s.tblCtx.GetColumnSize(tblID) -} - // Close closes the session func (s *Session) Close() { memBuf := &s.txn.MemBuf diff --git a/pkg/planner/core/integration_test.go b/pkg/planner/core/integration_test.go index 38837cc33b38e..c653a4e67e6b8 100644 --- a/pkg/planner/core/integration_test.go +++ b/pkg/planner/core/integration_test.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/planner/core" "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/util" @@ -2165,6 +2166,7 @@ func TestIssue48257(t *testing.T) { // 1. test sync load tk.MustExec("create table t(a int)") + testutil.HandleNextDDLEventWithTxn(h) tk.MustExec("insert into t value(1)") require.NoError(t, h.DumpStatsDeltaToKV(true)) require.NoError(t, h.Update(context.Background(), dom.InfoSchema())) @@ -2190,6 +2192,7 @@ func TestIssue48257(t *testing.T) { // 2. test async load tk.MustExec("set tidb_stats_load_sync_wait = 0") tk.MustExec("create table t1(a int)") + testutil.HandleNextDDLEventWithTxn(h) tk.MustExec("insert into t1 value(1)") require.NoError(t, h.DumpStatsDeltaToKV(true)) require.NoError(t, h.Update(context.Background(), dom.InfoSchema())) diff --git a/pkg/planner/core/logical_plan_builder.go b/pkg/planner/core/logical_plan_builder.go index 0d311aeb4b1a6..c283311793926 100644 --- a/pkg/planner/core/logical_plan_builder.go +++ b/pkg/planner/core/logical_plan_builder.go @@ -5205,7 +5205,9 @@ type TblColPosInfo struct { // HandleOrdinal represents the ordinal of the handle column. HandleCols util.HandleCols - *table.ExtraPartialRowOption + // IndexesRowLayout store the row layout of indexes. We need it if column pruning happens. + // If it's nil, means no column pruning happens. + IndexesRowLayout table.IndexesLayout } // MemoryUsage return the memory usage of TblColPosInfo @@ -5349,10 +5351,9 @@ func initColPosInfo(tid int64, names []*types.FieldName, handleCol util.HandleCo return TblColPosInfo{}, err } return TblColPosInfo{ - TblID: tid, - Start: offset, - HandleCols: handleCol, - ExtraPartialRowOption: &table.ExtraPartialRowOption{}, + TblID: tid, + Start: offset, + HandleCols: handleCol, }, nil } @@ -5380,7 +5381,6 @@ func pruneAndBuildSingleTableColPosInfoForDelete( // Columns can be seen by DELETE are the deletable columns. deletableCols := t.DeletableCols() deletableIdxs := t.DeletableIndices() - publicCols := t.Cols() tblLen := len(deletableCols) // Fix the start position of the columns. @@ -5415,25 +5415,6 @@ func pruneAndBuildSingleTableColPosInfoForDelete( fixedPos[i] = i - pruned } - // Fill in the ColumnSizes. - colPosInfo.ColumnsSizeHelper = &table.ColumnsSizeHelper{ - NotPruned: bitset.New(uint(len(publicCols))), - AvgSizes: make([]float64, 0, len(publicCols)), - PublicColsLayout: make([]int, 0, len(publicCols)), - } - colPosInfo.ColumnsSizeHelper.NotPruned.SetAll() - for i, col := range publicCols { - // If the column is not pruned, we can use the column data to get a more accurate size. - // We just need to record its position info. - if _, ok := fixedPos[col.Offset]; ok { - colPosInfo.ColumnsSizeHelper.PublicColsLayout = append(colPosInfo.ColumnsSizeHelper.PublicColsLayout, fixedPos[col.Offset]) - continue - } - // Otherwise we need to get the average size of the column by its field type. - // TODO: use statistics to get a maybe more accurate size. - colPosInfo.ColumnsSizeHelper.NotPruned.Clear(uint(i)) - colPosInfo.ColumnsSizeHelper.AvgSizes = append(colPosInfo.ColumnsSizeHelper.AvgSizes, float64(chunk.EstimateTypeWidth(&col.FieldType))) - } // Fix the index layout and fill in table.IndexRowLayoutOption. indexColMap := make(map[int64]table.IndexRowLayoutOption, len(deletableIdxs)) for _, idx := range deletableIdxs { diff --git a/pkg/planner/core/logical_plans_test.go b/pkg/planner/core/logical_plans_test.go index c05f5003892a9..dffe3b91fa200 100644 --- a/pkg/planner/core/logical_plans_test.go +++ b/pkg/planner/core/logical_plans_test.go @@ -2538,7 +2538,7 @@ func TestPruneColumnsForDelete(t *testing.T) { ret := make([][]string, 0, len(deletePlan.TblColPosInfos)) for _, colsLayout := range deletePlan.TblColPosInfos { innerRet := make([]string, 0, len(colsLayout.IndexesRowLayout)*2+2) - if colsLayout.ExtraPartialRowOption.IndexesRowLayout == nil { + if colsLayout.IndexesRowLayout == nil { sb.Reset() fmt.Fprintf(&sb, "no column-pruning happened") innerRet = append(innerRet, sb.String()) diff --git a/pkg/planner/indexadvisor/optimizer_test.go b/pkg/planner/indexadvisor/optimizer_test.go index 69af075d041ef..468cc1865611a 100644 --- a/pkg/planner/indexadvisor/optimizer_test.go +++ b/pkg/planner/indexadvisor/optimizer_test.go @@ -193,6 +193,7 @@ func TestOptimizerEstIndexSize(t *testing.T) { tk.MustExec(`insert into t values (1, space(32))`) require.NoError(t, h.DumpStatsDeltaToKV(true)) require.NoError(t, h.Update(context.Background(), dom.InfoSchema())) + tk.MustExec("analyze table t all columns") s, err = opt.EstIndexSize("test", "t", "a") require.NoError(t, err) require.Equal(t, float64(1), s) @@ -208,6 +209,7 @@ func TestOptimizerEstIndexSize(t *testing.T) { tk.MustExec(`insert into t values (1, space(64))`) require.NoError(t, h.DumpStatsDeltaToKV(true)) require.NoError(t, h.Update(context.Background(), dom.InfoSchema())) + tk.MustExec("analyze table t all columns") s, err = opt.EstIndexSize("test", "t", "a") require.NoError(t, err) require.Equal(t, float64(2), s) // 2 rows diff --git a/pkg/server/handler/tests/BUILD.bazel b/pkg/server/handler/tests/BUILD.bazel index 36af17e644c4b..b09d490f19320 100644 --- a/pkg/server/handler/tests/BUILD.bazel +++ b/pkg/server/handler/tests/BUILD.bazel @@ -36,6 +36,7 @@ go_test( "//pkg/sessionctx", "//pkg/sessionctx/stmtctx", "//pkg/sessionctx/variable", + "//pkg/statistics/handle/ddl/testutil", "//pkg/store/helper", "//pkg/store/mockstore", "//pkg/store/mockstore/unistore", diff --git a/pkg/server/handler/tests/http_handler_serial_test.go b/pkg/server/handler/tests/http_handler_serial_test.go index f9e9a6b3e30b8..8fccfa8e4f527 100644 --- a/pkg/server/handler/tests/http_handler_serial_test.go +++ b/pkg/server/handler/tests/http_handler_serial_test.go @@ -42,6 +42,7 @@ import ( "github.com/pingcap/tidb/pkg/server/handler/tikvhandler" "github.com/pingcap/tidb/pkg/session" "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/util/deadlockhistory" "github.com/pingcap/tidb/pkg/util/versioninfo" @@ -592,6 +593,7 @@ func TestGetSchemaStorage(t *testing.T) { tk.MustExec("use test") tk.MustExec("drop table if exists t") tk.MustExec("create table t (c int, d int, e char(5), index idx(e))") + testutil.HandleNextDDLEventWithTxn(h) tk.MustExec(`insert into t(c, d, e) values(1, 2, "c"), (2, 3, "d"), (3, 4, "e")`) h.FlushStats() @@ -611,7 +613,7 @@ func TestGetSchemaStorage(t *testing.T) { sort.Strings(names) require.Equal(t, expects, names) - require.Equal(t, []int64{3, 18, 54, 0, 6, 0}, []int64{ + require.Equal(t, []int64{3, 16, 48, 0, 0, 0}, []int64{ tables[0].TableRows, tables[0].AvgRowLength, tables[0].DataLength, diff --git a/pkg/session/session.go b/pkg/session/session.go index fa0e562083ce9..e7f601457ce58 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -907,7 +907,7 @@ func (s *session) updateStatsDeltaToCollector() { if s.statsCollector != nil && mapper != nil { for _, item := range mapper { if item.TableID > 0 { - s.statsCollector.Update(item.TableID, item.Delta, item.Count, &item.ColSize) + s.statsCollector.Update(item.TableID, item.Delta, item.Count) } } } diff --git a/pkg/sessionctx/variable/session.go b/pkg/sessionctx/variable/session.go index ee34ec5f04577..dac5c77532a48 100644 --- a/pkg/sessionctx/variable/session.go +++ b/pkg/sessionctx/variable/session.go @@ -340,38 +340,13 @@ func (tc *TransactionContext) CollectUnchangedKeysForLock(buf []kv.Key) []kv.Key return buf } -// ColSize is a data struct to store the delta information for a table. -type ColSize struct { - ColID int64 - Size int64 -} - -// DeltaCols is used to update the delta size for cols. -type DeltaCols interface { - // UpdateColSizeMap is used to update delta map for cols. - UpdateColSizeMap(m map[int64]int64) map[int64]int64 -} - -// DeltaColsMap implements DeltaCols -type DeltaColsMap map[int64]int64 - -// UpdateColSizeMap implements DeltaCols -func (cols DeltaColsMap) UpdateColSizeMap(m map[int64]int64) map[int64]int64 { - if m == nil && len(cols) > 0 { - m = make(map[int64]int64, len(cols)) - } - for colID, size := range cols { - m[colID] += size - } - return m -} - // UpdateDeltaForTable updates the delta info for some table. // The `cols` argument is used to update the delta size for cols. // If `cols` is nil, it means that the delta size for cols is not changed. func (tc *TransactionContext) UpdateDeltaForTable( - physicalTableID int64, delta int64, - count int64, cols DeltaCols, + physicalTableID int64, + delta int64, + count int64, ) { tc.tdmLock.Lock() defer tc.tdmLock.Unlock() @@ -382,9 +357,6 @@ func (tc *TransactionContext) UpdateDeltaForTable( item.Delta += delta item.Count += count item.TableID = physicalTableID - if cols != nil { - item.ColSize = cols.UpdateColSizeMap(item.ColSize) - } tc.TableDeltaMap[physicalTableID] = item } @@ -2908,7 +2880,6 @@ func (s *SessionVars) SetResourceGroupName(groupName string) { type TableDelta struct { Delta int64 Count int64 - ColSize map[int64]int64 InitTime time.Time // InitTime is the time that this delta is generated. TableID int64 } @@ -2918,7 +2889,6 @@ func (td TableDelta) Clone() TableDelta { return TableDelta{ Delta: td.Delta, Count: td.Count, - ColSize: maps.Clone(td.ColSize), InitTime: td.InitTime, TableID: td.TableID, } diff --git a/pkg/sessionctx/variable/session_test.go b/pkg/sessionctx/variable/session_test.go index 407b03eed582a..f4b6fac0144ed 100644 --- a/pkg/sessionctx/variable/session_test.go +++ b/pkg/sessionctx/variable/session_test.go @@ -334,14 +334,11 @@ func TestTableDeltaClone(t *testing.T) { td0 := variable.TableDelta{ Delta: 1, Count: 2, - ColSize: map[int64]int64{1: 1, 2: 2}, InitTime: time.Now(), TableID: 5, } td1 := td0.Clone() require.Equal(t, td0, td1) - td0.ColSize[3] = 3 - require.NotEqual(t, td0, td1) td2 := td0.Clone() require.Equal(t, td0, td2) @@ -356,7 +353,6 @@ func TestTransactionContextSavepoint(t *testing.T) { 1: { Delta: 1, Count: 2, - ColSize: map[int64]int64{1: 1}, InitTime: time.Now(), TableID: 5, }, @@ -375,11 +371,9 @@ func TestTransactionContextSavepoint(t *testing.T) { require.False(t, succ) require.Equal(t, 1, len(tc.Savepoints)) - tc.TableDeltaMap[1].ColSize[2] = 2 tc.TableDeltaMap[2] = variable.TableDelta{ Delta: 6, Count: 7, - ColSize: map[int64]int64{8: 8}, InitTime: time.Now(), TableID: 9, } @@ -389,7 +383,6 @@ func TestTransactionContextSavepoint(t *testing.T) { tc.AddSavepoint("S2", nil) require.Equal(t, 2, len(tc.Savepoints)) require.Equal(t, 1, len(tc.Savepoints[0].TxnCtxSavepoint.TableDeltaMap)) - require.Equal(t, 1, len(tc.Savepoints[0].TxnCtxSavepoint.TableDeltaMap[1].ColSize)) require.Equal(t, "s1", tc.Savepoints[0].Name) require.Equal(t, 2, len(tc.Savepoints[1].TxnCtxSavepoint.TableDeltaMap)) require.Equal(t, "s2", tc.Savepoints[1].Name) @@ -397,7 +390,6 @@ func TestTransactionContextSavepoint(t *testing.T) { tc.TableDeltaMap[3] = variable.TableDelta{ Delta: 10, Count: 11, - ColSize: map[int64]int64{12: 12}, InitTime: time.Now(), TableID: 13, } @@ -559,49 +551,6 @@ func TestSetStatus(t *testing.T) { require.Equal(t, mysql.ServerStatusAutocommit|mysql.ServerStatusCursorExists, sv.Status()) } -func TestMapDeltaCols(t *testing.T) { - for _, c := range []struct { - m map[int64]int64 - cols variable.DeltaColsMap - r map[int64]int64 - }{ - {}, - { - cols: map[int64]int64{1: 2}, - r: map[int64]int64{1: 2}, - }, - { - m: map[int64]int64{1: 2}, - r: map[int64]int64{1: 2}, - }, - { - m: map[int64]int64{1: 3, 3: 5, 5: 7}, - cols: map[int64]int64{1: 2, 3: -4, 6: 8}, - r: map[int64]int64{1: 5, 3: 1, 5: 7, 6: 8}, - }, - } { - originalCols := make(map[int64]int64) - for k, v := range c.cols { - originalCols[k] = v - } - - m2 := c.cols.UpdateColSizeMap(c.m) - require.Equal(t, c.r, m2) - if c.m == nil { - if len(c.cols) == 0 { - require.Nil(t, m2) - } - } else { - require.Equal(t, m2, c.m) - } - - if c.cols != nil { - // deltaCols not change - require.Equal(t, originalCols, map[int64]int64(c.cols)) - } - } -} - func TestRowIDShardGenerator(t *testing.T) { g := variable.NewRowIDShardGenerator(rand.New(rand.NewSource(12345)), 128) // #nosec G404) // default settings diff --git a/pkg/statistics/handle/autoanalyze/autoanalyze_test.go b/pkg/statistics/handle/autoanalyze/autoanalyze_test.go index fd18834fdfffb..c64483c7725c6 100644 --- a/pkg/statistics/handle/autoanalyze/autoanalyze_test.go +++ b/pkg/statistics/handle/autoanalyze/autoanalyze_test.go @@ -358,6 +358,7 @@ func TestAutoAnalyzeSkipColumnTypes(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec("create table t(a int, b int, c json, d text, e mediumtext, f blob, g mediumblob, index idx(d(10)))") + statstestutil.HandleNextDDLEventWithTxn(dom.StatsHandle()) tk.MustExec("insert into t values (1, 2, null, 'xxx', 'yyy', null, null)") tk.MustExec("select * from t where a = 1 and b = 1 and c = '1'") h := dom.StatsHandle() @@ -430,6 +431,7 @@ func TestAutoAnalyzeOutOfSpecifiedTime(t *testing.T) { tk.MustExec("use test") tk.MustExec("create table t (a int)") + statstestutil.HandleNextDDLEventWithTxn(dom.StatsHandle()) // to pass the stats.Pseudo check in autoAnalyzeTable tk.MustExec("analyze table t") // to pass the AutoAnalyzeMinCnt check in autoAnalyzeTable diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/queue_ddl_handler_test.go b/pkg/statistics/handle/autoanalyze/priorityqueue/queue_ddl_handler_test.go index e5188078af7dd..1bfb65e398fd3 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/queue_ddl_handler_test.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/queue_ddl_handler_test.go @@ -67,6 +67,7 @@ func TestHandleDDLEventsWithRunningJobs(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec("create table t1 (a int)") + statstestutil.HandleNextDDLEventWithTxn(handle) tk.MustExec("insert into t1 values (1)") statistics.AutoAnalyzeMinCnt = 0 defer func() { @@ -174,6 +175,7 @@ func TestTruncateTable(t *testing.T) { require.NoError(t, err) tableInfo := tbl.Meta() h := do.StatsHandle() + statstestutil.HandleNextDDLEventWithTxn(h) // Insert some data. testKit.MustExec("insert into t values (1,2),(2,2),(6,2),(11,2),(16,2)") require.NoError(t, h.DumpStatsDeltaToKV(true)) @@ -287,6 +289,7 @@ func TestDropTable(t *testing.T) { require.NoError(t, err) tableInfo := tbl.Meta() h := do.StatsHandle() + statstestutil.HandleNextDDLEventWithTxn(h) // Insert some data. testKit.MustExec("insert into t values (1,2),(2,2),(6,2),(11,2),(16,2)") require.NoError(t, h.DumpStatsDeltaToKV(true)) @@ -825,11 +828,12 @@ func TestDropSchemaEventWithDynamicPartition(t *testing.T) { func TestDropSchemaEventWithStaticPartition(t *testing.T) { store, do := testkit.CreateMockStoreAndDomain(t) + h := do.StatsHandle() testKit := testkit.NewTestKit(t, store) testKit.MustExec("use test") testKit.MustExec("create table t (c1 int, c2 int, index idx(c1, c2)) partition by range columns (c1) (partition p0 values less than (5), partition p1 values less than (10))") + statstestutil.HandleNextDDLEventWithTxn(h) testKit.MustExec("set global tidb_partition_prune_mode='static'") - h := do.StatsHandle() // Insert some data. testKit.MustExec("insert into t values (1,2),(6,6)") require.NoError(t, h.DumpStatsDeltaToKV(true)) diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/queue_test.go b/pkg/statistics/handle/autoanalyze/priorityqueue/queue_test.go index f12fc7df42378..89b4fd27eec68 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/queue_test.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/queue_test.go @@ -68,8 +68,11 @@ func TestAnalysisPriorityQueue(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") + handle := dom.StatsHandle() tk.MustExec("create table t1 (a int)") + statstestutil.HandleNextDDLEventWithTxn(handle) tk.MustExec("create table t2 (a int)") + statstestutil.HandleNextDDLEventWithTxn(handle) tk.MustExec("insert into t1 values (1)") tk.MustExec("insert into t2 values (1)") statistics.AutoAnalyzeMinCnt = 0 @@ -78,7 +81,6 @@ func TestAnalysisPriorityQueue(t *testing.T) { }() ctx := context.Background() - handle := dom.StatsHandle() require.NoError(t, handle.DumpStatsDeltaToKV(true)) require.NoError(t, handle.Update(ctx, dom.InfoSchema())) @@ -123,7 +125,9 @@ func TestRefreshLastAnalysisDuration(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec("create table t1 (a int)") + statstestutil.HandleNextDDLEventWithTxn(handle) tk.MustExec("create table t2 (a int)") + statstestutil.HandleNextDDLEventWithTxn(handle) tk.MustExec("insert into t1 values (1)") tk.MustExec("insert into t2 values (1)") statistics.AutoAnalyzeMinCnt = 0 @@ -177,7 +181,9 @@ func testProcessDMLChanges(t *testing.T, partitioned bool) { if partitioned { tk.MustExec("use test") tk.MustExec("create table t1 (a int) partition by range (a) (partition p0 values less than (10), partition p1 values less than (20))") + statstestutil.HandleNextDDLEventWithTxn(handle) tk.MustExec("create table t2 (a int) partition by range (a) (partition p0 values less than (10), partition p1 values less than (20))") + statstestutil.HandleNextDDLEventWithTxn(handle) // Because we don't handle the DDL events in unit tests by default, // we need to use this way to make sure the stats record for the global table is created. // Insert some rows into the tables. @@ -191,7 +197,9 @@ func testProcessDMLChanges(t *testing.T, partitioned bool) { } else { tk.MustExec("use test") tk.MustExec("create table t1 (a int)") + statstestutil.HandleNextDDLEventWithTxn(handle) tk.MustExec("create table t2 (a int)") + statstestutil.HandleNextDDLEventWithTxn(handle) } tk.MustExec("insert into t1 values (1)") tk.MustExec("insert into t2 values (1), (2)") @@ -278,7 +286,9 @@ func TestProcessDMLChangesWithRunningJobs(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec("create table t1 (a int)") + statstestutil.HandleNextDDLEventWithTxn(handle) tk.MustExec("create table t2 (a int)") + statstestutil.HandleNextDDLEventWithTxn(handle) tk.MustExec("insert into t1 values (1)") tk.MustExec("insert into t2 values (1)") statistics.AutoAnalyzeMinCnt = 0 @@ -363,6 +373,7 @@ func TestRequeueMustRetryJobs(t *testing.T) { tk.MustExec("create database example_schema") tk.MustExec("use example_schema") tk.MustExec("create table example_table (a int)") + statstestutil.HandleNextDDLEventWithTxn(handle) initJobs(tk) insertMultipleFinishedJobs(tk, "example_table", "") statistics.AutoAnalyzeMinCnt = 0 @@ -415,7 +426,9 @@ func TestProcessDMLChangesWithLockedTables(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec("create table t1 (a int)") + statstestutil.HandleNextDDLEventWithTxn(handle) tk.MustExec("create table t2 (a int)") + statstestutil.HandleNextDDLEventWithTxn(handle) tk.MustExec("insert into t1 values (1)") tk.MustExec("insert into t2 values (1)") statistics.AutoAnalyzeMinCnt = 0 @@ -474,6 +487,7 @@ func TestProcessDMLChangesWithLockedPartitionsAndDynamicPruneMode(t *testing.T) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec("create table t1 (a int) partition by range (a) (partition p0 values less than (10), partition p1 values less than (20))") + statstestutil.HandleNextDDLEventWithTxn(handle) tk.MustExec("insert into t1 values (1)") require.NoError(t, handle.DumpStatsDeltaToKV(true)) require.NoError(t, handle.Update(ctx, dom.InfoSchema())) @@ -534,6 +548,7 @@ func TestProcessDMLChangesWithLockedPartitionsAndStaticPruneMode(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec("create table t1 (a int) partition by range (a) (partition p0 values less than (10), partition p1 values less than (20))") + statstestutil.HandleNextDDLEventWithTxn(handle) tk.MustExec("insert into t1 values (1)") tk.MustExec("set global tidb_partition_prune_mode = 'static'") statistics.AutoAnalyzeMinCnt = 0 @@ -618,6 +633,7 @@ func TestPQHandlesTableDeletionGracefully(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec("create table t1 (a int)") + statstestutil.HandleNextDDLEventWithTxn(handle) tk.MustExec("insert into t1 values (1)") statistics.AutoAnalyzeMinCnt = 0 defer func() { diff --git a/pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel b/pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel index b8c337db3ddc4..c5d6ff6eb9af0 100644 --- a/pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel +++ b/pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel @@ -41,6 +41,7 @@ go_test( "//pkg/sessionctx/sysproctrack", "//pkg/statistics", "//pkg/statistics/handle/autoanalyze/priorityqueue", + "//pkg/statistics/handle/ddl/testutil", "//pkg/statistics/handle/types", "//pkg/statistics/handle/util", "//pkg/testkit", diff --git a/pkg/statistics/handle/autoanalyze/refresher/refresher_test.go b/pkg/statistics/handle/autoanalyze/refresher/refresher_test.go index c3c4d4aa58cb5..dcbc201fd669e 100644 --- a/pkg/statistics/handle/autoanalyze/refresher/refresher_test.go +++ b/pkg/statistics/handle/autoanalyze/refresher/refresher_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/refresher" + "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/testkit" "github.com/stretchr/testify/require" @@ -384,12 +385,13 @@ func TestDoNotRetryTableNotExistJob(t *testing.T) { }() store, dom := testkit.CreateMockStoreAndDomain(t) + handle := dom.StatsHandle() tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec("create table t1 (a int, b int, index idx(a))") + testutil.HandleNextDDLEventWithTxn(handle) // Insert some data. tk.MustExec("insert into t1 values (1, 1)") - handle := dom.StatsHandle() require.NoError(t, handle.DumpStatsDeltaToKV(true)) require.NoError(t, handle.Update(context.Background(), dom.InfoSchema())) sysProcTracker := dom.SysProcTracker() @@ -431,13 +433,15 @@ func TestAnalyzeHighestPriorityTablesWithFailedAnalysis(t *testing.T) { }() store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) + handle := dom.StatsHandle() tk.MustExec("use test") tk.MustExec("set global tidb_enable_auto_analyze=true") tk.MustExec("set global tidb_auto_analyze_concurrency=2") tk.MustExec("create table t1 (a int, b int, index idx(a)) partition by range (a) (partition p0 values less than (2), partition p1 values less than (4))") + testutil.HandleNextDDLEventWithTxn(handle) tk.MustExec("create table t2 (a int, b int, index idx(a)) partition by range (a) (partition p0 values less than (2), partition p1 values less than (4))") + testutil.HandleNextDDLEventWithTxn(handle) tk.MustExec("analyze table t2") - handle := dom.StatsHandle() require.NoError(t, handle.DumpStatsDeltaToKV(true)) require.NoError(t, handle.Update(context.Background(), dom.InfoSchema())) tk.MustExec("insert into t1 values (1, 1), (2, 2), (3, 3)") diff --git a/pkg/statistics/handle/ddl/subscriber.go b/pkg/statistics/handle/ddl/subscriber.go index a4af15a1815d0..49aeb70e39760 100644 --- a/pkg/statistics/handle/ddl/subscriber.go +++ b/pkg/statistics/handle/ddl/subscriber.go @@ -304,9 +304,9 @@ func (h subscriber) recordHistoricalStatsMeta( return history.RecordHistoricalStatsMeta( ctx, sctx, - id, startTS, util.StatsMetaHistorySourceSchemaChange, + id, ) } @@ -413,9 +413,7 @@ func updateGlobalTableStats4DropPartition( ctx, sctx, startTS, - variable.TableDelta{Count: count, Delta: delta}, - globalTableInfo.ID, - isLocked, + storage.NewDeltaUpdate(globalTableInfo.ID, variable.TableDelta{Count: count, Delta: delta}, isLocked), )) } @@ -597,9 +595,7 @@ func updateGlobalTableStats4TruncatePartition( ctx, sctx, startTS, - variable.TableDelta{Count: count, Delta: delta}, - globalTableInfo.ID, - isLocked, + storage.NewDeltaUpdate(globalTableInfo.ID, variable.TableDelta{Count: count, Delta: delta}, isLocked), ) if err != nil { fields := truncatePartitionsLogFields( diff --git a/pkg/statistics/handle/handletest/handle_test.go b/pkg/statistics/handle/handletest/handle_test.go index 84563dec1fc33..64c81379553f3 100644 --- a/pkg/statistics/handle/handletest/handle_test.go +++ b/pkg/statistics/handle/handletest/handle_test.go @@ -232,17 +232,9 @@ func TestLoadHist(t *testing.T) { newStatsTbl := h.GetTableStats(tableInfo) // The stats table is updated. require.False(t, oldStatsTbl == newStatsTbl) - // Only the TotColSize of histograms is updated. + // TotColSize of histograms is not changed. oldStatsTbl.ForEachColumnImmutable(func(id int64, hist *statistics.Column) bool { - require.Less(t, hist.TotColSize, newStatsTbl.GetCol(id).TotColSize) - - temp := hist.TotColSize - hist.TotColSize = newStatsTbl.GetCol(id).TotColSize - require.True(t, statistics.HistogramEqual(&hist.Histogram, &newStatsTbl.GetCol(id).Histogram, false)) - hist.TotColSize = temp - - require.True(t, hist.CMSketch.Equal(newStatsTbl.GetCol(id).CMSketch)) - require.Equal(t, newStatsTbl.GetCol(id).Info, hist.Info) + require.Equal(t, hist.TotColSize, newStatsTbl.GetCol(id).TotColSize) return false }) // Add column c3, we only update c3. @@ -1149,6 +1141,7 @@ func TestStatsCacheUpdateSkip(t *testing.T) { h := do.StatsHandle() testKit.MustExec("use test") testKit.MustExec("create table t (c1 int, c2 int)") + statstestutil.HandleNextDDLEventWithTxn(h) testKit.MustExec("insert into t values(1, 2)") require.NoError(t, h.DumpStatsDeltaToKV(true)) testKit.MustExec("analyze table t") diff --git a/pkg/statistics/handle/handletest/statstest/BUILD.bazel b/pkg/statistics/handle/handletest/statstest/BUILD.bazel index 096e8ebac5d22..67cf8fab44990 100644 --- a/pkg/statistics/handle/handletest/statstest/BUILD.bazel +++ b/pkg/statistics/handle/handletest/statstest/BUILD.bazel @@ -9,7 +9,7 @@ go_test( ], flaky = True, race = "on", - shard_count = 13, + shard_count = 14, deps = [ "//pkg/config", "//pkg/parser/model", diff --git a/pkg/statistics/handle/handletest/statstest/stats_test.go b/pkg/statistics/handle/handletest/statstest/stats_test.go index 9fbde012bd3f1..5530bef97e477 100644 --- a/pkg/statistics/handle/handletest/statstest/stats_test.go +++ b/pkg/statistics/handle/handletest/statstest/stats_test.go @@ -439,3 +439,32 @@ func TestInitStatsIssue41938(t *testing.T) { require.NoError(t, h.InitStats(context.Background(), dom.InfoSchema())) h.SetLease(0) } + +func TestDumpStatsDeltaInBatch(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + testKit := testkit.NewTestKit(t, store) + testKit.MustExec("use test") + testKit.MustExec("create table t1 (c1 int, c2 int)") + testKit.MustExec("insert into t1 values (1, 1), (2, 2), (3, 3)") + testKit.MustExec("create table t2 (c1 int, c2 int)") + testKit.MustExec("insert into t2 values (1, 1), (2, 2), (3, 3)") + + // Dump stats delta in one batch. + handle := dom.StatsHandle() + require.NoError(t, handle.DumpStatsDeltaToKV(true)) + + // Check the mysql.stats_meta table. + rows := testKit.MustQuery("select modify_count, count, version from mysql.stats_meta order by table_id").Rows() + require.Len(t, rows, 2) + + require.Equal(t, "3", rows[0][0]) + require.Equal(t, "3", rows[0][1]) + require.Equal(t, "3", rows[1][0]) + require.Equal(t, "3", rows[1][1]) + require.Equal( + t, + rows[0][2], + rows[1][2], + "The version of two tables should be the same because they are dumped in the same transaction.", + ) +} diff --git a/pkg/statistics/handle/history/BUILD.bazel b/pkg/statistics/handle/history/BUILD.bazel index 1a1a44f789cea..13737e039cf0a 100644 --- a/pkg/statistics/handle/history/BUILD.bazel +++ b/pkg/statistics/handle/history/BUILD.bazel @@ -9,10 +9,11 @@ go_library( "//pkg/meta/model", "//pkg/sessionctx", "//pkg/statistics/handle/cache", + "//pkg/statistics/handle/logutil", "//pkg/statistics/handle/storage", "//pkg/statistics/handle/types", "//pkg/statistics/handle/util", - "//pkg/util/logutil", + "//pkg/util/intest", "@com_github_pingcap_errors//:errors", "@org_uber_go_zap//:zap", ], diff --git a/pkg/statistics/handle/history/history_stats.go b/pkg/statistics/handle/history/history_stats.go index 1382d6ddbde4f..05634b0747db2 100644 --- a/pkg/statistics/handle/history/history_stats.go +++ b/pkg/statistics/handle/history/history_stats.go @@ -22,10 +22,11 @@ import ( "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/statistics/handle/cache" + statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil" "github.com/pingcap/tidb/pkg/statistics/handle/storage" "github.com/pingcap/tidb/pkg/statistics/handle/types" "github.com/pingcap/tidb/pkg/statistics/handle/util" - "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/pingcap/tidb/pkg/util/intest" "go.uber.org/zap" ) @@ -55,7 +56,7 @@ func (sh *statsHistoryImpl) RecordHistoricalStatsToStorage(dbName string, tableI return 0, errors.Trace(err) } if js == nil { - logutil.BgLogger().Warn("no stats data to record", zap.String("dbName", dbName), zap.String("tableName", tableInfo.Name.O)) + statslogutil.StatsLogger().Warn("no stats data to record", zap.String("dbName", dbName), zap.String("tableName", tableInfo.Name.O)) return 0, nil } var version uint64 @@ -66,32 +67,55 @@ func (sh *statsHistoryImpl) RecordHistoricalStatsToStorage(dbName string, tableI return version, err } -// RecordHistoricalStatsMeta records stats meta of the specified version to stats_meta_history table. -func (sh *statsHistoryImpl) RecordHistoricalStatsMeta(tableID int64, version uint64, source string, enforce bool) { +// RecordHistoricalStatsMeta records the historical stats meta in mysql.stats_meta_history one by one. +func (sh *statsHistoryImpl) RecordHistoricalStatsMeta(version uint64, source string, enforce bool, tableIDs ...int64) { if version == 0 { return } - if !enforce { - tbl, ok := sh.statsHandle.Get(tableID) - if !ok { - return - } - if !tbl.IsInitialized() { - return + + var targetedTableIDs []int64 + if enforce { + targetedTableIDs = tableIDs + } else { + targetedTableIDs = make([]int64, 0, len(tableIDs)) + for _, tableID := range tableIDs { + tbl, ok := sh.statsHandle.Get(tableID) + if tableID == 0 || !ok || !tbl.IsInitialized() { + continue + } + targetedTableIDs = append(targetedTableIDs, tableID) } } + shouldSkipHistoricalStats := false err := util.CallWithSCtx(sh.statsHandle.SPool(), func(sctx sessionctx.Context) error { if !sctx.GetSessionVars().EnableHistoricalStats { + shouldSkipHistoricalStats = true return nil } - return RecordHistoricalStatsMeta(util.StatsCtx, sctx, tableID, version, source) + return nil }, util.FlagWrapTxn) - if err != nil { // just log the error, hide the error from the outside caller. - logutil.BgLogger().Error("record historical stats meta failed", - zap.Int64("table-id", tableID), + if err != nil { + statslogutil.StatsLogger().Error("failed to check historical stats enable status", zap.Uint64("version", version), zap.String("source", source), + zap.Int64s("tableIDs", tableIDs), + zap.Int64s("targetedTableIDs", targetedTableIDs), zap.Error(err)) + return + } + if !shouldSkipHistoricalStats { + for _, tableID := range targetedTableIDs { + err := util.CallWithSCtx(sh.statsHandle.SPool(), func(sctx sessionctx.Context) error { + return RecordHistoricalStatsMeta(util.StatsCtx, sctx, version, source, tableID) + }, util.FlagWrapTxn) + if err != nil { + statslogutil.StatsLogger().Error("record historical stats meta failed", + zap.Uint64("version", version), + zap.String("source", source), + zap.Int64("tableID", tableID), + zap.Error(err)) + } + } } } @@ -104,32 +128,36 @@ func (sh *statsHistoryImpl) CheckHistoricalStatsEnable() (enable bool, err error return } -// RecordHistoricalStatsMeta records the historical stats meta. +// RecordHistoricalStatsMeta records the historical stats meta in mysql.stats_meta_history with the given version and source. func RecordHistoricalStatsMeta( ctx context.Context, sctx sessionctx.Context, - tableID int64, version uint64, source string, + tableID int64, ) error { + intest.Assert(version != 0, "version should not be zero") + intest.Assert(tableID != 0, "tableID should not be zero") if tableID == 0 || version == 0 { return errors.Errorf("tableID %d, version %d are invalid", tableID, version) } + rows, _, err := util.ExecRowsWithCtx( ctx, sctx, - "select modify_count, count from mysql.stats_meta where table_id = %? and version = %?", + "SELECT modify_count, count FROM mysql.stats_meta WHERE table_id = %? and version = %? FOR UPDATE", tableID, version, ) if err != nil { return errors.Trace(err) } + intest.Assert(len(rows) != 0, "no historical meta stats can be recorded") if len(rows) == 0 { return errors.New("no historical meta stats can be recorded") } - modifyCount, count := rows[0].GetInt64(0), rows[0].GetInt64(1) + modifyCount, count := rows[0].GetInt64(0), rows[0].GetInt64(1) const sql = "REPLACE INTO mysql.stats_meta_history(table_id, modify_count, count, version, source, create_time) VALUES (%?, %?, %?, %?, %?, NOW())" if _, err := util.ExecWithCtx( ctx, @@ -144,6 +172,7 @@ func RecordHistoricalStatsMeta( return errors.Trace(err) } cache.TableRowStatsCache.Invalidate(tableID) + return nil } diff --git a/pkg/statistics/handle/storage/stats_read_writer.go b/pkg/statistics/handle/storage/stats_read_writer.go index ca3180fbf03b8..4937c6986874b 100644 --- a/pkg/statistics/handle/storage/stats_read_writer.go +++ b/pkg/statistics/handle/storage/stats_read_writer.go @@ -56,7 +56,7 @@ func (s *statsReadWriter) InsertColStats2KV(physicalID int64, colInfos []*model. statsVer := uint64(0) defer func() { if err == nil && statsVer != 0 { - s.statsHandler.RecordHistoricalStatsMeta(physicalID, statsVer, util.StatsMetaHistorySourceSchemaChange, false) + s.statsHandler.RecordHistoricalStatsMeta(statsVer, util.StatsMetaHistorySourceSchemaChange, false, physicalID) } }() @@ -76,7 +76,7 @@ func (s *statsReadWriter) InsertTableStats2KV(info *model.TableInfo, physicalID statsVer := uint64(0) defer func() { if err == nil && statsVer != 0 { - s.statsHandler.RecordHistoricalStatsMeta(physicalID, statsVer, util.StatsMetaHistorySourceSchemaChange, false) + s.statsHandler.RecordHistoricalStatsMeta(statsVer, util.StatsMetaHistorySourceSchemaChange, false, physicalID) } }() @@ -103,7 +103,7 @@ func (s *statsReadWriter) UpdateStatsMetaVersionForGC(physicalID int64) (err err statsVer := uint64(0) defer func() { if err == nil && statsVer != 0 { - s.statsHandler.RecordHistoricalStatsMeta(physicalID, statsVer, util.StatsMetaHistorySourceSchemaChange, false) + s.statsHandler.RecordHistoricalStatsMeta(statsVer, util.StatsMetaHistorySourceSchemaChange, false, physicalID) } }() @@ -134,7 +134,7 @@ func (s *statsReadWriter) SaveTableStatsToStorage(results *statistics.AnalyzeRes }, util.FlagWrapTxn) if err == nil && statsVer != 0 { tableID := results.TableID.GetStatisticsID() - s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, source, true) + s.statsHandler.RecordHistoricalStatsMeta(statsVer, source, true, tableID) } return err } @@ -185,7 +185,7 @@ func (s *statsReadWriter) SaveStatsToStorage( return err }, util.FlagWrapTxn) if err == nil && statsVer != 0 { - s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, source, false) + s.statsHandler.RecordHistoricalStatsMeta(statsVer, source, false, tableID) } return } @@ -198,7 +198,7 @@ func (s *statsReadWriter) SaveMetaToStorage(tableID, count, modifyCount int64, s return err }, util.FlagWrapTxn) if err == nil && statsVer != 0 { - s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, source, false) + s.statsHandler.RecordHistoricalStatsMeta(statsVer, source, false, tableID) } return } @@ -211,7 +211,7 @@ func (s *statsReadWriter) InsertExtendedStats(statsName string, colIDs []int64, return err }, util.FlagWrapTxn) if err == nil && statsVer != 0 { - s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, "extended stats", false) + s.statsHandler.RecordHistoricalStatsMeta(statsVer, "extended stats", false, tableID) } return } @@ -224,7 +224,7 @@ func (s *statsReadWriter) MarkExtendedStatsDeleted(statsName string, tableID int return err }, util.FlagWrapTxn) if err == nil && statsVer != 0 { - s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, "extended stats", false) + s.statsHandler.RecordHistoricalStatsMeta(statsVer, "extended stats", false, tableID) } return } @@ -237,7 +237,7 @@ func (s *statsReadWriter) SaveExtendedStatsToStorage(tableID int64, extStats *st return err }, util.FlagWrapTxn) if err == nil && statsVer != 0 { - s.statsHandler.RecordHistoricalStatsMeta(tableID, statsVer, "extended stats", false) + s.statsHandler.RecordHistoricalStatsMeta(statsVer, "extended stats", false, tableID) } return } diff --git a/pkg/statistics/handle/storage/update.go b/pkg/statistics/handle/storage/update.go index 008388ed76a67..0660a67123ba7 100644 --- a/pkg/statistics/handle/storage/update.go +++ b/pkg/statistics/handle/storage/update.go @@ -56,57 +56,115 @@ func UpdateStatsVersion(ctx context.Context, sctx sessionctx.Context) error { return nil } -// UpdateStatsMeta update the stats meta stat for this Table. +// DeltaUpdate is the delta update for stats meta. +type DeltaUpdate struct { + Delta variable.TableDelta + TableID int64 + IsLocked bool +} + +// NewDeltaUpdate creates a new DeltaUpdate. +func NewDeltaUpdate(tableID int64, delta variable.TableDelta, isLocked bool) *DeltaUpdate { + return &DeltaUpdate{ + Delta: delta, + TableID: tableID, + IsLocked: isLocked, + } +} + +// UpdateStatsMeta updates the stats meta for multiple tables. +// It uses the INSERT INTO ... ON DUPLICATE KEY UPDATE syntax to fill the missing records. +// Note: Make sure call this function in a transaction. func UpdateStatsMeta( ctx context.Context, sctx sessionctx.Context, startTS uint64, - delta variable.TableDelta, - id int64, - isLocked bool, + updates ...*DeltaUpdate, ) (err error) { - if isLocked { - // use INSERT INTO ... ON DUPLICATE KEY UPDATE here to fill missing stats_table_locked. - // Note: For locked tables, it is possible that the record gets deleted. So it can be negative. - _, err = statsutil.ExecWithCtx(ctx, sctx, "insert into mysql.stats_table_locked (version, table_id, modify_count, count) values (%?, %?, %?, %?) on duplicate key "+ - "update version = values(version), modify_count = modify_count + values(modify_count), count = count + values(count)", - startTS, id, delta.Count, delta.Delta) - } else { - if delta.Delta < 0 { - // use INSERT INTO ... ON DUPLICATE KEY UPDATE here to fill missing stats_meta. - _, err = statsutil.ExecWithCtx(ctx, sctx, "insert into mysql.stats_meta (version, table_id, modify_count, count) values (%?, %?, %?, 0) on duplicate key "+ - "update version = values(version), modify_count = modify_count + values(modify_count), count = if(count > %?, count - %?, 0)", - startTS, id, delta.Count, -delta.Delta, -delta.Delta) + if len(updates) == 0 { + return nil + } + + // Separate locked and unlocked updates + // In most cases, the number of locked tables is small. + lockedTableIDs := make([]string, 0, 20) + lockedValues := make([]string, 0, 20) + // In most cases, the number of unlocked tables is large. + unlockedTableIDs := make([]string, 0, len(updates)) + unlockedPosValues := make([]string, 0, max(len(updates)/2, 1)) + unlockedNegValues := make([]string, 0, max(len(updates)/2, 1)) + cacheInvalidateIDs := make([]int64, 0, len(updates)) + + for _, update := range updates { + if update.IsLocked { + lockedTableIDs = append(lockedTableIDs, fmt.Sprintf("%d", update.TableID)) + lockedValues = append(lockedValues, fmt.Sprintf("(%d, %d, %d, %d)", + startTS, update.TableID, update.Delta.Count, update.Delta.Delta)) } else { - // use INSERT INTO ... ON DUPLICATE KEY UPDATE here to fill missing stats_meta. - _, err = statsutil.ExecWithCtx(ctx, sctx, "insert into mysql.stats_meta (version, table_id, modify_count, count) values (%?, %?, %?, %?) on duplicate key "+ - "update version = values(version), modify_count = modify_count + values(modify_count), count = count + values(count)", startTS, - id, delta.Count, delta.Delta) + unlockedTableIDs = append(unlockedTableIDs, fmt.Sprintf("%d", update.TableID)) + if update.Delta.Delta < 0 { + unlockedNegValues = append(unlockedNegValues, fmt.Sprintf("(%d, %d, %d, %d)", + startTS, update.TableID, update.Delta.Count, -update.Delta.Delta)) + } else { + unlockedPosValues = append(unlockedPosValues, fmt.Sprintf("(%d, %d, %d, %d)", + startTS, update.TableID, update.Delta.Count, update.Delta.Delta)) + } + cacheInvalidateIDs = append(cacheInvalidateIDs, update.TableID) } - cache.TableRowStatsCache.Invalidate(id) } - return err -} -// DumpTableStatColSizeToKV dumps the column size stats to storage. -func DumpTableStatColSizeToKV(sctx sessionctx.Context, id int64, delta variable.TableDelta) error { - if len(delta.ColSize) == 0 { - return nil + // Lock the stats_meta and stats_table_locked tables using SELECT FOR UPDATE to prevent write conflicts. + // This ensures that we acquire the necessary locks before attempting to update the tables, reducing the likelihood + // of encountering lock conflicts during the update process. + lockedTableIDsStr := strings.Join(lockedTableIDs, ",") + if lockedTableIDsStr != "" { + if _, err = statsutil.ExecWithCtx(ctx, sctx, fmt.Sprintf("select * from mysql.stats_table_locked where table_id in (%s) for update", lockedTableIDsStr)); err != nil { + return err + } } - values := make([]string, 0, len(delta.ColSize)) - for histID, deltaColSize := range delta.ColSize { - if deltaColSize == 0 { - continue + + unlockedTableIDsStr := strings.Join(unlockedTableIDs, ",") + if unlockedTableIDsStr != "" { + if _, err = statsutil.ExecWithCtx(ctx, sctx, fmt.Sprintf("select * from mysql.stats_meta where table_id in (%s) for update", unlockedTableIDsStr)); err != nil { + return err } - values = append(values, fmt.Sprintf("(%d, 0, %d, 0, %d)", id, histID, deltaColSize)) } - if len(values) == 0 { - return nil + // Execute locked updates + if len(lockedValues) > 0 { + sql := fmt.Sprintf("insert into mysql.stats_table_locked (version, table_id, modify_count, count) values %s "+ + "on duplicate key update version = values(version), modify_count = modify_count + values(modify_count), "+ + "count = count + values(count)", strings.Join(lockedValues, ",")) + if _, err = statsutil.ExecWithCtx(ctx, sctx, sql); err != nil { + return err + } + } + + // Execute unlocked updates with positive delta + if len(unlockedPosValues) > 0 { + sql := fmt.Sprintf("insert into mysql.stats_meta (version, table_id, modify_count, count) values %s "+ + "on duplicate key update version = values(version), modify_count = modify_count + values(modify_count), "+ + "count = count + values(count)", strings.Join(unlockedPosValues, ",")) + if _, err = statsutil.ExecWithCtx(ctx, sctx, sql); err != nil { + return err + } + } + + // Execute unlocked updates with negative delta + if len(unlockedNegValues) > 0 { + sql := fmt.Sprintf("insert into mysql.stats_meta (version, table_id, modify_count, count) values %s "+ + "on duplicate key update version = values(version), modify_count = modify_count + values(modify_count), "+ + "count = if(count > values(count), count - values(count), 0)", strings.Join(unlockedNegValues, ",")) + if _, err = statsutil.ExecWithCtx(ctx, sctx, sql); err != nil { + return err + } } - sql := fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, tot_col_size) "+ - "values %s on duplicate key update tot_col_size = GREATEST(0, tot_col_size + values(tot_col_size))", strings.Join(values, ",")) - _, _, err := statsutil.ExecRows(sctx, sql) - return errors.Trace(err) + + // Invalidate cache for all unlocked tables + for _, id := range cacheInvalidateIDs { + cache.TableRowStatsCache.Invalidate(id) + } + + return nil } // InsertExtendedStats inserts a record into mysql.stats_extended and update version in mysql.stats_meta. diff --git a/pkg/statistics/handle/types/interfaces.go b/pkg/statistics/handle/types/interfaces.go index 40028176ad98b..de61f873696cf 100644 --- a/pkg/statistics/handle/types/interfaces.go +++ b/pkg/statistics/handle/types/interfaces.go @@ -108,8 +108,8 @@ type IndexUsage interface { // StatsHistory is used to manage historical stats. type StatsHistory interface { - // RecordHistoricalStatsMeta records stats meta of the specified version to stats_meta_history. - RecordHistoricalStatsMeta(tableID int64, version uint64, source string, enforce bool) + // RecordHistoricalStatsMeta records the historical stats meta in mysql.stats_meta_history one by one. + RecordHistoricalStatsMeta(version uint64, source string, enforce bool, tableIDs ...int64) // CheckHistoricalStatsEnable check whether historical stats is enabled. CheckHistoricalStatsEnable() (enable bool, err error) diff --git a/pkg/statistics/handle/updatetest/update_test.go b/pkg/statistics/handle/updatetest/update_test.go index 76f20d0e83f85..ef2ecddae9548 100644 --- a/pkg/statistics/handle/updatetest/update_test.go +++ b/pkg/statistics/handle/updatetest/update_test.go @@ -135,7 +135,7 @@ func TestSingleSessionInsert(t *testing.T) { rs.Check(testkit.Rows("40", "70")) rs = testKit.MustQuery("select tot_col_size from mysql.stats_histograms").Sort() - rs.Check(testkit.Rows("0", "0", "20", "20")) + rs.Check(testkit.Rows("0", "0", "10", "10")) // test dump delta only when `modify count / count` is greater than the ratio. originValue := usage.DumpStatsDeltaRatio @@ -324,7 +324,7 @@ func TestUpdatePartition(t *testing.T) { statsTbl := h.GetPartitionStats(tableInfo, def.ID) require.Equal(t, int64(1), statsTbl.ModifyCount) require.Equal(t, int64(1), statsTbl.RealtimeCount) - require.Equal(t, int64(2), statsTbl.GetCol(bColID).TotColSize) + require.Equal(t, int64(0), statsTbl.GetCol(bColID).TotColSize) } testKit.MustExec(`update t set a = a + 1, b = "aa"`) @@ -334,7 +334,7 @@ func TestUpdatePartition(t *testing.T) { statsTbl := h.GetPartitionStats(tableInfo, def.ID) require.Equal(t, int64(2), statsTbl.ModifyCount) require.Equal(t, int64(1), statsTbl.RealtimeCount) - require.Equal(t, int64(3), statsTbl.GetCol(bColID).TotColSize) + require.Equal(t, int64(0), statsTbl.GetCol(bColID).TotColSize) } testKit.MustExec("delete from t") @@ -432,8 +432,7 @@ func TestAutoUpdate(t *testing.T) { // Modify count is non-zero means that we do not analyze the table. require.Equal(t, int64(1), stats.ModifyCount) stats.ForEachColumnImmutable(func(_ int64, item *statistics.Column) bool { - // TotColSize = 27, because the table has not been analyzed, and insert statement will add 3(length of 'eee') to TotColSize. - require.Equal(t, int64(27), item.TotColSize) + require.Equal(t, int64(23), item.TotColSize) return true }) @@ -514,6 +513,7 @@ func TestIssue25700(t *testing.T) { tk.MustExec("use test") tk.MustExec("drop table if exists t") tk.MustExec("CREATE TABLE `t` ( `ldecimal` decimal(32,4) DEFAULT NULL, `rdecimal` decimal(32,4) DEFAULT NULL, `gen_col` decimal(36,4) GENERATED ALWAYS AS (`ldecimal` + `rdecimal`) VIRTUAL, `col_timestamp` timestamp(3) NULL DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;") + statstestutil.HandleNextDDLEventWithTxn(dom.StatsHandle()) tk.MustExec("analyze table t") tk.MustExec("INSERT INTO `t` (`ldecimal`, `rdecimal`, `col_timestamp`) VALUES (2265.2200, 9843.4100, '1999-12-31 16:00:00')" + strings.Repeat(", (2265.2200, 9843.4100, '1999-12-31 16:00:00')", int(statistics.AutoAnalyzeMinCnt))) require.NoError(t, dom.StatsHandle().DumpStatsDeltaToKV(true)) diff --git a/pkg/statistics/handle/usage/BUILD.bazel b/pkg/statistics/handle/usage/BUILD.bazel index 9a02fa38858d7..4bc99808740ab 100644 --- a/pkg/statistics/handle/usage/BUILD.bazel +++ b/pkg/statistics/handle/usage/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "//pkg/metrics", "//pkg/sessionctx", "//pkg/sessionctx/variable", + "//pkg/statistics/handle/logutil", "//pkg/statistics/handle/storage", "//pkg/statistics/handle/types", "//pkg/statistics/handle/usage/indexusage", @@ -26,6 +27,7 @@ go_library( "//pkg/util/sqlescape", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", + "@org_uber_go_zap//:zap", ], ) diff --git a/pkg/statistics/handle/usage/session_stats_collect.go b/pkg/statistics/handle/usage/session_stats_collect.go index 18bab8f48723a..372dfa335cb77 100644 --- a/pkg/statistics/handle/usage/session_stats_collect.go +++ b/pkg/statistics/handle/usage/session_stats_collect.go @@ -29,12 +29,14 @@ import ( "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" + statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil" "github.com/pingcap/tidb/pkg/statistics/handle/storage" utilstats "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/sqlescape" + "go.uber.org/zap" ) var ( @@ -82,6 +84,11 @@ func (s *statsUsageImpl) needDumpStatsDelta(is infoschema.InfoSchema, dumpAll bo return false } +const ( + dumpDeltaBatchSize = 100_000 + tooSlowThreshold = 20 * time.Second +) + // DumpStatsDeltaToKV sweeps the whole list and updates the global map, then we dumps every table that held in map to KV. // If the mode is `DumpDelta`, it will only dump that delta info that `Modify Count / Table Count` greater than a ratio. func (s *statsUsageImpl) DumpStatsDeltaToKV(dumpAll bool) error { @@ -91,99 +98,184 @@ func (s *statsUsageImpl) DumpStatsDeltaToKV(dumpAll bool) error { dur := time.Since(start) metrics.StatsDeltaUpdateHistogram.Observe(dur.Seconds()) }() + s.SweepSessionStatsList() deltaMap := s.SessionTableDelta().GetDeltaAndReset() defer func() { s.SessionTableDelta().Merge(deltaMap) }() + if time.Since(start) > tooSlowThreshold { + statslogutil.SingletonStatsSamplerLogger().Warn("Sweeping session list is too slow", + zap.Int("tableCount", len(deltaMap)), + zap.Duration("duration", time.Since(start))) + } + + // Sort table IDs to ensure a consistent dump order to reduce the chance of deadlock. + tableIDs := make([]int64, 0, len(deltaMap)) + for id := range deltaMap { + tableIDs = append(tableIDs, id) + } + slices.Sort(tableIDs) + + // Dump stats delta in batches. + for i := 0; i < len(tableIDs); i += dumpDeltaBatchSize { + end := i + dumpDeltaBatchSize + if end > len(tableIDs) { + end = len(tableIDs) + } - return utilstats.CallWithSCtx(s.statsHandle.SPool(), func(sctx sessionctx.Context) error { - is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) - currentTime := time.Now() - for id, item := range deltaMap { - if !s.needDumpStatsDelta(is, dumpAll, id, item, currentTime) { - continue + batchTableIDs := tableIDs[i:end] + var ( + statsVersion uint64 + batchUpdates []*storage.DeltaUpdate + ) + batchStart := time.Now() + err := utilstats.CallWithSCtx(s.statsHandle.SPool(), func(sctx sessionctx.Context) error { + is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) + batchUpdates = make([]*storage.DeltaUpdate, 0, len(batchTableIDs)) + // Collect all updates in the batch. + for _, id := range batchTableIDs { + item := deltaMap[id] + if !s.needDumpStatsDelta(is, dumpAll, id, item, batchStart) { + continue + } + batchUpdates = append(batchUpdates, storage.NewDeltaUpdate(id, item, false)) } - updated, err := s.dumpTableStatCountToKV(is, id, item) + if time.Since(batchStart) > tooSlowThreshold { + statslogutil.SingletonStatsSamplerLogger().Warn("Collecting batch updates is too slow", + zap.Int("tableCount", len(batchUpdates)), + zap.Duration("duration", time.Since(batchStart))) + } + + if len(batchUpdates) == 0 { + return nil + } + + // Process all updates in the batch with a single transaction. + // Note: batchUpdates may be modified in dumpStatsDeltaToKV. (e.g. sorting, updating IsLocked) + startTs, updated, err := s.dumpStatsDeltaToKV(is, sctx, batchUpdates) if err != nil { return errors.Trace(err) } - if updated { - UpdateTableDeltaMap(deltaMap, id, -item.Delta, -item.Count, nil) + statsVersion = startTs + // Note: Ensure we use the updated slice after dumpStatsDeltaToKV, + // because dumpStatsDeltaToKV may modify the underlying array of batchUpdates. + // For example, dumpStatsDeltaToKV may sort the array. + batchUpdates = updated + intest.AssertFunc( + func() bool { + return slices.IsSortedFunc(batchUpdates, func(i, j *storage.DeltaUpdate) int { + return cmp.Compare(i.TableID, j.TableID) + }) + }, + "batchUpdates should be sorted by table ID", + ) + + // Update deltaMap after the batch is successfully dumped. + for _, update := range batchUpdates { + delete(deltaMap, update.TableID) } - if err = storage.DumpTableStatColSizeToKV(sctx, id, item); err != nil { - delete(deltaMap, id) - return errors.Trace(err) + + if time.Since(batchStart) > tooSlowThreshold { + statslogutil.SingletonStatsSamplerLogger().Warn("Dumping batch updates is too slow", + zap.Int("tableCount", len(batchUpdates)), + zap.Duration("duration", time.Since(batchStart))) } - if updated { - delete(deltaMap, id) - } else { - m := deltaMap[id] - m.ColSize = nil - deltaMap[id] = m + + return nil + }, utilstats.FlagWrapTxn) + if err != nil { + return errors.Trace(err) + } + startRecordHistoricalStatsMeta := time.Now() + unlockedTableIDs := make([]int64, 0, len(batchUpdates)) + for _, update := range batchUpdates { + if !update.IsLocked { + failpoint.Inject("panic-when-record-historical-stats-meta", func() { + panic("panic when record historical stats meta") + }) + unlockedTableIDs = append(unlockedTableIDs, update.TableID) } } - return nil - }) + s.statsHandle.RecordHistoricalStatsMeta(statsVersion, "flush stats", false, unlockedTableIDs...) + // Log a warning if recording historical stats meta takes too long, as it can be slow for large table counts + if time.Since(startRecordHistoricalStatsMeta) > time.Minute*15 { + statslogutil.SingletonStatsSamplerLogger().Warn("Recording historical stats meta is too slow", + zap.Int("tableCount", len(batchUpdates)), + zap.Duration("duration", time.Since(startRecordHistoricalStatsMeta))) + } + } + + return nil } -// dumpTableStatDeltaToKV dumps a single delta with some table to KV and updates the version. -// For a partitioned table, we will update its global-stats as well. -func (s *statsUsageImpl) dumpTableStatCountToKV(is infoschema.InfoSchema, physicalTableID int64, delta variable.TableDelta) (updated bool, err error) { - statsVersion := uint64(0) - isLocked := false - defer func() { - // Only record the historical stats meta when the table is not locked because all stats meta are stored in the locked table. - if err == nil && statsVersion != 0 && !isLocked { - failpoint.Inject("panic-when-record-historical-stats-meta", func() { - panic("panic when record historical stats meta") - }) - s.statsHandle.RecordHistoricalStatsMeta(physicalTableID, statsVersion, "flush stats", false) - } - }() - if delta.Count == 0 { - return true, nil +// dumpStatsDeltaToKV processes and writes multiple table stats count deltas to KV storage in batches. +// Note: The `batchUpdates` parameter may be modified during the execution of this function. +// +// 1. Handles partitioned tables: +// - For partitioned tables, the function ensures that the global statistics are updated appropriately +// in addition to the individual partition statistics. +// +// 2. Stashes lock information: +// - Records lock information for each table or partition. +func (s *statsUsageImpl) dumpStatsDeltaToKV( + is infoschema.InfoSchema, + sctx sessionctx.Context, + updates []*storage.DeltaUpdate, +) (statsVersion uint64, updated []*storage.DeltaUpdate, err error) { + if len(updates) == 0 { + return 0, nil, nil + } + beforeLen := len(updates) + statsVersion, err = utilstats.GetStartTS(sctx) + if err != nil { + return 0, nil, errors.Trace(err) } - err = utilstats.CallWithSCtx(s.statsHandle.SPool(), func(sctx sessionctx.Context) error { - statsVersion, err = utilstats.GetStartTS(sctx) - if err != nil { - return errors.Trace(err) + // Collect all table IDs that need lock checking. + allTableIDs := make([]int64, 0, len(updates)) + for _, update := range updates { + // No need to update if the delta is zero. + if update.Delta.Count == 0 { + continue } - - tbl, _, _ := is.FindTableByPartitionID(physicalTableID) - // Check if the table and its partitions are locked. - tidAndPid := make([]int64, 0, 2) - if tbl != nil { - tidAndPid = append(tidAndPid, tbl.Meta().ID) + // Add psychical table ID. + allTableIDs = append(allTableIDs, update.TableID) + // Add parent table ID if it's a partition table. + if tbl, _, _ := is.FindTableByPartitionID(update.TableID); tbl != nil { + allTableIDs = append(allTableIDs, tbl.Meta().ID) } - tidAndPid = append(tidAndPid, physicalTableID) - lockedTables, err := s.statsHandle.GetLockedTables(tidAndPid...) - if err != nil { - return err + } + + // Batch get lock status for all tables. + lockedTables, err := s.statsHandle.GetLockedTables(allTableIDs...) + if err != nil { + return 0, nil, errors.Trace(err) + } + + // Prepare batch updates + for _, update := range updates { + // No need to update if the delta is zero. + if update.Delta.Count == 0 { + continue } - var affectedRows uint64 - // If it's a partitioned table and its global-stats exists, - // update its count and modify_count as well. - if tbl != nil { - // We need to check if the table and the partition are locked. + tbl, _, _ := is.FindTableByPartitionID(update.TableID) + if tbl != nil { // It's a partition table. + tableID := tbl.Meta().ID isTableLocked := false isPartitionLocked := false - tableID := tbl.Meta().ID + if _, ok := lockedTables[tableID]; ok { isTableLocked = true } - if _, ok := lockedTables[physicalTableID]; ok { + if _, ok := lockedTables[update.TableID]; ok { isPartitionLocked = true } + tableOrPartitionLocked := isTableLocked || isPartitionLocked - isLocked = tableOrPartitionLocked - if err = storage.UpdateStatsMeta(utilstats.StatsCtx, sctx, statsVersion, delta, - physicalTableID, tableOrPartitionLocked); err != nil { - return err - } - affectedRows += sctx.GetSessionVars().StmtCtx.AffectedRows() + update.IsLocked = tableOrPartitionLocked + // If the partition is locked, we don't need to update the global-stats. // We will update its global-stats when the partition is unlocked. // 1. If table is locked and partition is locked, we only stash the delta in the partition's lock info. @@ -196,31 +288,32 @@ func (s *statsUsageImpl) dumpTableStatCountToKV(is infoschema.InfoSchema, physic // 4. If table is not locked and partition is not locked, we update the global-stats. // To sum up, we only need to update the global-stats when the table and the partition are not locked. if !isTableLocked && !isPartitionLocked { - // If it's a partitioned table and its global-stats exists, update its count and modify_count as well. - if err = storage.UpdateStatsMeta(utilstats.StatsCtx, sctx, statsVersion, delta, tableID, isTableLocked); err != nil { - return err - } - affectedRows += sctx.GetSessionVars().StmtCtx.AffectedRows() + updates = append(updates, storage.NewDeltaUpdate(tableID, update.Delta, isTableLocked)) } } else { - // This is a non-partitioned table. - // Check if it's locked. isTableLocked := false - if _, ok := lockedTables[physicalTableID]; ok { + if _, ok := lockedTables[update.TableID]; ok { isTableLocked = true } - isLocked = isTableLocked - if err = storage.UpdateStatsMeta(utilstats.StatsCtx, sctx, statsVersion, delta, - physicalTableID, isTableLocked); err != nil { - return err - } - affectedRows += sctx.GetSessionVars().StmtCtx.AffectedRows() + update.IsLocked = isTableLocked } + } + intest.Assert(len(updates) >= beforeLen, "updates can only be appended") + if len(updates) > beforeLen { + // Resort updates after appending new updates. + slices.SortFunc(updates, func(i, j *storage.DeltaUpdate) int { + return cmp.Compare(i.TableID, j.TableID) + }) + } + + // Batch update stats meta. + if err = storage.UpdateStatsMeta(utilstats.StatsCtx, sctx, statsVersion, updates...); err != nil { + return 0, nil, errors.Trace(err) + } - updated = affectedRows > 0 - return nil - }, utilstats.FlagWrapTxn) - return + // Because we may sort the updates, we need to return the updated slice. + // Otherwise the caller may use the original slice and get wrong results. + return statsVersion, updates, nil } // DumpColStatsUsageToKV sweeps the whole list, updates the column stats usage map and dumps it to KV. @@ -305,10 +398,10 @@ func (s *SessionStatsItem) Delete() { } // Update will updates the delta and count for one table id. -func (s *SessionStatsItem) Update(id int64, delta int64, count int64, colSize *map[int64]int64) { +func (s *SessionStatsItem) Update(id int64, delta int64, count int64) { s.Lock() defer s.Unlock() - s.mapper.Update(id, delta, count, colSize) + s.mapper.Update(id, delta, count) } // ClearForTest clears the mapper for test. @@ -455,10 +548,10 @@ func (m *TableDelta) GetDeltaAndReset() map[int64]variable.TableDelta { } // Update updates the delta of the table. -func (m *TableDelta) Update(id int64, delta int64, count int64, colSize *map[int64]int64) { +func (m *TableDelta) Update(id int64, delta int64, count int64) { m.lock.Lock() defer m.lock.Unlock() - UpdateTableDeltaMap(m.delta, id, delta, count, colSize) + UpdateTableDeltaMap(m.delta, id, delta, count) } // Merge merges the deltaMap into the TableDelta. @@ -469,23 +562,15 @@ func (m *TableDelta) Merge(deltaMap map[int64]variable.TableDelta) { m.lock.Lock() defer m.lock.Unlock() for id, item := range deltaMap { - UpdateTableDeltaMap(m.delta, id, item.Delta, item.Count, &item.ColSize) + UpdateTableDeltaMap(m.delta, id, item.Delta, item.Count) } } // UpdateTableDeltaMap updates the delta of the table. -func UpdateTableDeltaMap(m map[int64]variable.TableDelta, id int64, delta int64, count int64, colSize *map[int64]int64) { +func UpdateTableDeltaMap(m map[int64]variable.TableDelta, id int64, delta int64, count int64) { item := m[id] item.Delta += delta item.Count += count - if item.ColSize == nil { - item.ColSize = make(map[int64]int64) - } - if colSize != nil { - for key, val := range *colSize { - item.ColSize[key] += val - } - } m[id] = item } diff --git a/pkg/table/BUILD.bazel b/pkg/table/BUILD.bazel index 85065af129027..12383de4ef8c6 100644 --- a/pkg/table/BUILD.bazel +++ b/pkg/table/BUILD.bazel @@ -38,7 +38,6 @@ go_library( "//pkg/util/sqlexec", "//pkg/util/timeutil", "//pkg/util/tracing", - "@com_github_bits_and_blooms_bitset//:bitset", "@com_github_pingcap_errors//:errors", "@org_uber_go_zap//:zap", ], diff --git a/pkg/table/table.go b/pkg/table/table.go index e1e5183302f61..d5c2fcbacea8b 100644 --- a/pkg/table/table.go +++ b/pkg/table/table.go @@ -22,7 +22,6 @@ import ( "context" "time" - "github.com/bits-and-blooms/bitset" mysql "github.com/pingcap/tidb/pkg/errno" "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/expression/exprctx" @@ -218,7 +217,6 @@ type UpdateRecordOption interface { // RemoveRecordOpt contains the options will be used when removing a record. type RemoveRecordOpt struct { indexesLayoutOffset IndexesLayout - columnSize *ColumnsSizeHelper } // HasIndexesLayout returns whether the RemoveRecordOpt has indexes layout. @@ -236,11 +234,6 @@ func (opt *RemoveRecordOpt) GetIndexLayout(indexID int64) IndexRowLayoutOption { return opt.indexesLayoutOffset[indexID] } -// GetColumnSizeOpt returns the ColumnSizeOption of the RemoveRecordOpt. -func (opt *RemoveRecordOpt) GetColumnSizeOpt() *ColumnsSizeHelper { - return opt.columnSize -} - // NewRemoveRecordOpt creates a new RemoveRecordOpt with options. func NewRemoveRecordOpt(opts ...RemoveRecordOption) *RemoveRecordOpt { opt := &RemoveRecordOpt{} @@ -255,17 +248,6 @@ type RemoveRecordOption interface { applyRemoveRecordOpt(*RemoveRecordOpt) } -// ExtraPartialRowOption is the combined one of IndexesLayout and ColumnSizeOption. -type ExtraPartialRowOption struct { - IndexesRowLayout IndexesLayout - ColumnsSizeHelper *ColumnsSizeHelper -} - -func (e *ExtraPartialRowOption) applyRemoveRecordOpt(opt *RemoveRecordOpt) { - opt.indexesLayoutOffset = e.IndexesRowLayout - opt.columnSize = e.ColumnsSizeHelper -} - // IndexRowLayoutOption is the option for index row layout. // It is used to specify the order of the index columns in the row. type IndexRowLayoutOption []int @@ -282,21 +264,8 @@ func (idx IndexesLayout) GetIndexLayout(idxID int64) IndexRowLayoutOption { return idx[idxID] } -// ColumnsSizeHelper records the column size information. -// We're updating the total column size and total row size used in table statistics when doing DML. -// If the column is pruned when doing DML, we can't get the accurate size of the column. So we need the estimated avg size. -// - If the column is not pruned, we can calculate its acurate size by the real data. -// - Otherwise, we use the estimated avg size given by table statistics and field type information. -type ColumnsSizeHelper struct { - // NotPruned is a bitset to record the columns that are not pruned. - // The ith bit is 1 means the ith public column is not pruned. - NotPruned *bitset.BitSet - // If the column is pruned, we use the estimated avg size. They are stored by their ordinal in the table. - // The ith element is the estimated size of the ith pruned public column. - AvgSizes []float64 - // If the column is not pruned, we use the accurate size. They are stored by their ordinal in the pruned row. - // The ith element is the position of the ith public column in the pruned row. - PublicColsLayout []int +func (idx IndexesLayout) applyRemoveRecordOpt(opt *RemoveRecordOpt) { + opt.indexesLayoutOffset = idx } // CommonMutateOptFunc is a function to provide common options for mutating a table. diff --git a/pkg/table/tables/tables.go b/pkg/table/tables/tables.go index fe6c87fe1b39b..2c1ab0995987d 100644 --- a/pkg/table/tables/tables.go +++ b/pkg/table/tables/tables.go @@ -536,21 +536,7 @@ func (t *TableCommon) updateRecord(sctx table.MutateContext, txn kv.Transaction, memBuffer.Release(sh) if s, ok := sctx.GetStatisticsSupport(); ok { - colSizeBuffer := mutateBuffers.GetColSizeDeltaBufferWithCap(len(t.Cols())) - for id, col := range t.Cols() { - size, err := codec.EstimateValueSize(tc, newData[id]) - if err != nil { - continue - } - newLen := size - 1 - size, err = codec.EstimateValueSize(tc, oldData[id]) - if err != nil { - continue - } - oldLen := size - 1 - colSizeBuffer.AddColSizeDelta(col.ID, int64(newLen-oldLen)) - } - s.UpdatePhysicalTableDelta(t.physicalTableID, 0, 1, colSizeBuffer) + s.UpdatePhysicalTableDelta(t.physicalTableID, 0, 1) } return nil } @@ -917,15 +903,7 @@ func (t *TableCommon) addRecord(sctx table.MutateContext, txn kv.Transaction, r memBuffer.Release(sh) if s, ok := sctx.GetStatisticsSupport(); ok { - colSizeBuffer := sctx.GetMutateBuffers().GetColSizeDeltaBufferWithCap(len(t.Cols())) - for id, col := range t.Cols() { - size, err := codec.EstimateValueSize(tc, r[id]) - if err != nil { - continue - } - colSizeBuffer.AddColSizeDelta(col.ID, int64(size-1)) - } - s.UpdatePhysicalTableDelta(t.physicalTableID, 1, 1, colSizeBuffer) + s.UpdatePhysicalTableDelta(t.physicalTableID, 1, 1) } return recordID, nil } @@ -1181,33 +1159,8 @@ func (t *TableCommon) removeRecord(ctx table.MutateContext, txn kv.Transaction, memBuffer.Release(sh) if s, ok := ctx.GetStatisticsSupport(); ok { - // a reusable buffer to save malloc - // Note: The buffer should not be referenced or modified outside this function. - // It can only act as a temporary buffer for the current function call. - colSizeBuffer := ctx.GetMutateBuffers().GetColSizeDeltaBufferWithCap(len(t.Cols())) - pruned, notPruned := 0, 0 - columnSizeOpt := opt.GetColumnSizeOpt() - var size int - for id, col := range t.Cols() { - columnOffset := id - if columnSizeOpt != nil { - if !columnSizeOpt.NotPruned.Test(uint(id)) { - size = int(columnSizeOpt.AvgSizes[pruned]) - pruned++ - colSizeBuffer.AddColSizeDelta(col.ID, -int64(size-1)) - continue - } - columnOffset = columnSizeOpt.PublicColsLayout[notPruned] - notPruned++ - } - size, err = codec.EstimateValueSize(tc, r[columnOffset]) - if err != nil { - continue - } - colSizeBuffer.AddColSizeDelta(col.ID, -int64(size-1)) - } s.UpdatePhysicalTableDelta( - t.physicalTableID, -1, 1, colSizeBuffer, + t.physicalTableID, -1, 1, ) } return err diff --git a/pkg/table/tblctx/BUILD.bazel b/pkg/table/tblctx/BUILD.bazel index 72906368a6f18..f7ea643d76115 100644 --- a/pkg/table/tblctx/BUILD.bazel +++ b/pkg/table/tblctx/BUILD.bazel @@ -32,7 +32,7 @@ go_test( srcs = ["buffers_test.go"], embed = [":tblctx"], flaky = True, - shard_count = 6, + shard_count = 5, deps = [ "//pkg/errctx", "//pkg/kv", diff --git a/pkg/table/tblctx/buffers.go b/pkg/table/tblctx/buffers.go index 53a33fdaa648b..5e9e8d93d0bb1 100644 --- a/pkg/table/tblctx/buffers.go +++ b/pkg/table/tblctx/buffers.go @@ -113,35 +113,6 @@ func (b *CheckRowBuffer) Reset(capacity int) { b.rowToCheck = ensureCapacityAndReset(b.rowToCheck, 0, capacity) } -// ColSizeDeltaBuffer implements variable.DeltaCols -var _ variable.DeltaCols = &ColSizeDeltaBuffer{} - -// ColSizeDeltaBuffer is a buffer to store the change of column size. -type ColSizeDeltaBuffer struct { - delta []variable.ColSize -} - -// Reset resets the inner buffers to a capacity. -func (b *ColSizeDeltaBuffer) Reset(capacity int) { - b.delta = ensureCapacityAndReset(b.delta, 0, capacity) -} - -// AddColSizeDelta adds the column size delta to the buffer. -func (b *ColSizeDeltaBuffer) AddColSizeDelta(colID int64, size int64) { - b.delta = append(b.delta, variable.ColSize{ColID: colID, Size: size}) -} - -// UpdateColSizeMap updates the column size map which uses columID as the map key and column size as the value. -func (b *ColSizeDeltaBuffer) UpdateColSizeMap(m map[int64]int64) map[int64]int64 { - if m == nil && len(b.delta) > 0 { - m = make(map[int64]int64, len(b.delta)) - } - for _, delta := range b.delta { - m[delta.ColID] += delta.Size - } - return m -} - // MutateBuffers is a memory pool for table related memory allocation that aims to reuse memory // and saves allocation. // It is used in table operations like AddRecord/UpdateRecord/DeleteRecord. @@ -149,10 +120,9 @@ func (b *ColSizeDeltaBuffer) UpdateColSizeMap(m map[int64]int64) map[int64]int64 // Because inner slices are reused, you should not call the get methods again before finishing the previous usage. // Otherwise, the previous data will be overwritten. type MutateBuffers struct { - stmtBufs *variable.WriteStmtBufs - encodeRow *EncodeRowBuffer - checkRow *CheckRowBuffer - colSizeDelta *ColSizeDeltaBuffer + stmtBufs *variable.WriteStmtBufs + encodeRow *EncodeRowBuffer + checkRow *CheckRowBuffer } // NewMutateBuffers creates a new `MutateBuffers`. @@ -163,8 +133,7 @@ func NewMutateBuffers(stmtBufs *variable.WriteStmtBufs) *MutateBuffers { encodeRow: &EncodeRowBuffer{ writeStmtBufs: stmtBufs, }, - checkRow: &CheckRowBuffer{}, - colSizeDelta: &ColSizeDeltaBuffer{}, + checkRow: &CheckRowBuffer{}, } } @@ -194,20 +163,6 @@ func (b *MutateBuffers) GetCheckRowBufferWithCap(capacity int) *CheckRowBuffer { return buffer } -// GetColSizeDeltaBufferWithCap gets the buffer for column size delta collection -// and resets the capacity of its inner slice. -// Usage: -// 1. Call `GetColSizeDeltaBufferWithCap` to get the buffer. -// 2. Call `ColSizeDeltaBuffer.AddColSizeDelta` for every column to add column size delta. -// 3. Call `ColSizeDeltaBuffer.UpdateColSizeMap` to update a column size map. -// Because the inner slices are reused, you should not call this method again before finishing the previous usage. -// Otherwise, the previous data will be overwritten. -func (b *MutateBuffers) GetColSizeDeltaBufferWithCap(capacity int) *ColSizeDeltaBuffer { - buffer := b.colSizeDelta - buffer.Reset(capacity) - return buffer -} - // GetWriteStmtBufs returns the `*variable.WriteStmtBufs` func (b *MutateBuffers) GetWriteStmtBufs() *variable.WriteStmtBufs { return b.stmtBufs diff --git a/pkg/table/tblctx/buffers_test.go b/pkg/table/tblctx/buffers_test.go index 266c2bb8108e1..264efe7619cec 100644 --- a/pkg/table/tblctx/buffers_test.go +++ b/pkg/table/tblctx/buffers_test.go @@ -201,35 +201,6 @@ func TestCheckRowBuffer(t *testing.T) { require.Equal(t, 6, cap(buffer.rowToCheck)) } -func TestColSizeDeltaBuffer(t *testing.T) { - buffer := &ColSizeDeltaBuffer{} - buffer.Reset(6) - require.Equal(t, 0, len(buffer.delta)) - require.Equal(t, 6, cap(buffer.delta)) - require.Nil(t, buffer.UpdateColSizeMap(nil)) - - buffer.AddColSizeDelta(1, 2) - buffer.AddColSizeDelta(3, -4) - buffer.AddColSizeDelta(10, 11) - require.Equal(t, []variable.ColSize{{ColID: 1, Size: 2}, {ColID: 3, Size: -4}, {ColID: 10, Size: 11}}, buffer.delta) - - require.Equal(t, map[int64]int64{1: 2, 3: -4, 10: 11}, buffer.UpdateColSizeMap(nil)) - m := make(map[int64]int64) - m2 := buffer.UpdateColSizeMap(m) - require.Equal(t, map[int64]int64{1: 2, 3: -4, 10: 11}, m2) - require.Equal(t, m2, m) - - m = map[int64]int64{1: 3, 3: 5, 5: 7} - m2 = buffer.UpdateColSizeMap(m) - require.Equal(t, map[int64]int64{1: 5, 3: 1, 5: 7, 10: 11}, m2) - require.Equal(t, m2, m) - - // reset should not shrink the capacity - buffer.Reset(2) - require.Equal(t, 0, len(buffer.delta)) - require.Equal(t, 6, cap(buffer.delta)) -} - func TestMutateBuffersGetter(t *testing.T) { stmtBufs := &variable.WriteStmtBufs{} buffers := NewMutateBuffers(stmtBufs) @@ -241,9 +212,6 @@ func TestMutateBuffersGetter(t *testing.T) { require.Equal(t, 6, cap(update.rowToCheck)) require.Equal(t, 6, cap(update.rowToCheck)) - colSize := buffers.GetColSizeDeltaBufferWithCap(6) - require.Equal(t, 6, cap(colSize.delta)) - require.Same(t, stmtBufs, buffers.GetWriteStmtBufs()) } diff --git a/pkg/table/tblctx/table.go b/pkg/table/tblctx/table.go index b9e0ecad3ac68..a5b30edc7fb4c 100644 --- a/pkg/table/tblctx/table.go +++ b/pkg/table/tblctx/table.go @@ -38,7 +38,7 @@ type RowEncodingConfig struct { // StatisticsSupport is used for statistics update operations. type StatisticsSupport interface { // UpdatePhysicalTableDelta updates the physical table delta. - UpdatePhysicalTableDelta(physicalTableID int64, delta int64, count int64, cols variable.DeltaCols) + UpdatePhysicalTableDelta(physicalTableID int64, delta int64, count int64) } // CachedTableSupport is used for cached table operations diff --git a/pkg/table/tblsession/table.go b/pkg/table/tblsession/table.go index 2ede0148886a5..e28aaaee60955 100644 --- a/pkg/table/tblsession/table.go +++ b/pkg/table/tblsession/table.go @@ -127,10 +127,10 @@ func (ctx *MutateContext) GetStatisticsSupport() (tblctx.StatisticsSupport, bool // UpdatePhysicalTableDelta implements the StatisticsSupport interface. func (ctx *MutateContext) UpdatePhysicalTableDelta( - physicalTableID int64, delta int64, count int64, cols variable.DeltaCols, + physicalTableID int64, delta int64, count int64, ) { if txnCtx := ctx.vars().TxnCtx; txnCtx != nil { - txnCtx.UpdateDeltaForTable(physicalTableID, delta, count, cols) + txnCtx.UpdateDeltaForTable(physicalTableID, delta, count) } } diff --git a/pkg/table/tblsession/table_test.go b/pkg/table/tblsession/table_test.go index d48d524752dd8..f6487631fbfc1 100644 --- a/pkg/table/tblsession/table_test.go +++ b/pkg/table/tblsession/table_test.go @@ -103,14 +103,13 @@ func TestSessionMutateContextFields(t *testing.T) { require.NotNil(t, statisticsSupport) require.Equal(t, 0, len(txnCtx.TableDeltaMap)) statisticsSupport.UpdatePhysicalTableDelta( - 12, 1, 2, variable.DeltaColsMap(map[int64]int64{3: 4, 5: 6}), + 12, 1, 2, ) require.Equal(t, 1, len(txnCtx.TableDeltaMap)) deltaMap := txnCtx.TableDeltaMap[12] require.Equal(t, int64(12), deltaMap.TableID) require.Equal(t, int64(1), deltaMap.Delta) require.Equal(t, int64(2), deltaMap.Count) - require.Equal(t, map[int64]int64{3: 4, 5: 6}, deltaMap.ColSize) // cached table support sctx.GetSessionVars().TxnCtx = nil cachedTableSupport, ok := ctx.GetCachedTableSupport()