Skip to content

Commit

Permalink
fix exchange partition, address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Wenqi Mou <[email protected]>
  • Loading branch information
Tristan1900 committed Jan 22, 2025
1 parent dd4e734 commit f6d3330
Show file tree
Hide file tree
Showing 16 changed files with 604 additions and 336 deletions.
5 changes: 1 addition & 4 deletions br/pkg/restore/log_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,7 @@ go_library(
"//pkg/util/codec",
"//pkg/util/redact",
"//pkg/util/sqlexec",
<<<<<<< HEAD
"//pkg/util/table-filter",
"@com_github_docker_go_units//:go-units",
=======
>>>>>>> 1c4c1af0d5 (remove filter and use tracker)
"@com_github_fatih_color//:color",
"@com_github_gogo_protobuf//proto",
"@com_github_opentracing_opentracing_go//:opentracing-go",
Expand All @@ -79,6 +75,7 @@ go_library(
"@org_golang_x_sync//errgroup",
"@org_uber_go_multierr//:multierr",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
],
)

Expand Down
12 changes: 11 additions & 1 deletion br/pkg/restore/log_client/batch_meta_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (rp *RestoreMetaKVProcessor) RestoreAndRewriteMetaKVFiles(
return errors.Trace(err)
}

// AddTable global schema version to trigger a full reload so every TiDB node in the cluster will get synced with
// AddPhysicalId global schema version to trigger a full reload so every TiDB node in the cluster will get synced with
// the latest schema update.
if err := rp.client.UpdateSchemaVersionFullReload(ctx); err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -165,6 +165,7 @@ func (mp *MetaKVInfoProcessor) ProcessBatch(
return nil, errors.Trace(err)
}

log.Info("######### getting key", zap.Any("key", entry.E.Key))
// write cf doesn't have short value in it
if value == nil {
continue
Expand Down Expand Up @@ -207,7 +208,16 @@ func (mp *MetaKVInfoProcessor) ProcessBatch(
// add to table rename history
mp.tableHistoryManager.AddTableHistory(tableInfo.ID, tableInfo.Name.String(), dbID)

// track partition information if table is partitioned
// this is needed since a table might get exchanged
if tableInfo.Partition != nil {
for _, def := range tableInfo.Partition.Definitions {
mp.tableHistoryManager.AddPartitionHistory(def.ID, dbID, tableInfo.ID, tableInfo.Name.O)
}
}

// update the id map
log.Info("######### processing table", zap.Any("tableInfo", tableInfo))
if err = mp.tableMappingManager.ProcessTableValueAndUpdateIdMapping(dbID, &tableInfo); err != nil {
return nil, errors.Trace(err)
}
Expand Down
20 changes: 15 additions & 5 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -948,10 +948,10 @@ func (rc *LogClient) loadSchemasMap(
func readFilteredFullBackupTables(
ctx context.Context,
s storage.ExternalStorage,
piTRTableFilter *utils.PiTRTableTracker,
piTRIdTracker *utils.PiTRIdTracker,
cipherInfo *backuppb.CipherInfo,
) (map[int64]*metautil.Table, error) {
if piTRTableFilter == nil {
if piTRIdTracker == nil {
return nil, errors.Errorf("missing pitr table tracker information")
}
metaData, err := s.ReadFile(ctx, metautil.MetaFile)
Expand Down Expand Up @@ -979,7 +979,7 @@ func readFilteredFullBackupTables(

tables := make(map[int64]*metautil.Table)
for _, db := range databases {
if !piTRTableFilter.ContainsDB(db.Info.ID) {
if !piTRIdTracker.ContainsDB(db.Info.ID) {
continue
}

Expand All @@ -991,7 +991,7 @@ func readFilteredFullBackupTables(
tableAdded = true
continue
}
if !piTRTableFilter.ContainsTable(db.Info.ID, table.Info.ID) {
if !piTRIdTracker.ContainsPhysicalId(db.Info.ID, table.Info.ID) {
continue
}
tables[table.Info.ID] = table
Expand Down Expand Up @@ -1023,7 +1023,7 @@ type GetIDMapConfig struct {
FullBackupStorageConfig *FullBackupStorageConfig
CipherInfo *backuppb.CipherInfo
// generated at full restore step that contains all the table ids that need to restore
PiTRTableTracker *utils.PiTRTableTracker
PiTRTableTracker *utils.PiTRIdTracker
}

const UnsafePITRLogRestoreStartBeforeAnyUpstreamUserDDL = "UNSAFE_PITR_LOG_RESTORE_START_BEFORE_ANY_UPSTREAM_USER_DDL"
Expand Down Expand Up @@ -1967,3 +1967,13 @@ func PutRawKvWithRetry(ctx context.Context, client *rawkv.RawKVBatchClient, key,
}
return nil
}

// DropTable drops a table with the given database and table name
func (rc *LogClient) DropTable(ctx context.Context, dbName, tableName string) error {
dropSQL := "DROP TABLE IF EXISTS %n.%n"
if err := rc.unsafeSession.ExecuteInternal(ctx, dropSQL, dbName, tableName); err != nil {
return errors.Annotatef(err, "failed to drop table %s.%s", dbName, tableName)
}
log.Info("dropped table", zap.String("db", dbName), zap.String("table", tableName))
return nil
}
2 changes: 1 addition & 1 deletion br/pkg/stream/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ go_test(
],
embed = [":stream"],
flaky = True,
shard_count = 48,
shard_count = 49,
deps = [
"//br/pkg/storage",
"//br/pkg/streamhelper",
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/stream/logging_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ func LogDBReplaceMap(title string, dbReplaces map[UpstreamID]*DBReplace) {
zap.String("table", tableReplace.Name),
zap.Int64("upstreamId", upstreamTableID),
zap.Int64("downstreamId", tableReplace.TableID))
for upPartId, downPartId := range tableReplace.PartitionMap {
fields = append(fields,
zap.Int64("up partition", upPartId),
zap.Int64("down partition", downPartId))
}
}
return fields
}()...)
Expand Down
60 changes: 41 additions & 19 deletions br/pkg/stream/table_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,40 +14,62 @@

package stream

// DBIDAndTableName stores db id and the table name to locate the table
type DBIDAndTableName struct {
DbID int64
TableName string
const defaultParentTableID = 0

// TableLocationInfo stores the table name, db id, and parent table id if is a partition
type TableLocationInfo struct {
DbID int64
TableName string
ParentTableID int64 // tracking parent table if is a partition
}

func (t *TableLocationInfo) IsPartition() bool {
return t.ParentTableID != defaultParentTableID
}

type LogBackupTableHistoryManager struct {
// maps table ID to its original and current names
// [0] is original location, [1] is current location
tableNameHistory map[int64][2]DBIDAndTableName
// record all the db id to name that were seen during log backup DDL history
// maps table/partition ID to [original, current] location info
tableNameHistory map[int64][2]TableLocationInfo
dbIdToName map[int64]string
needToBuildIdMap bool
}

func NewTableHistoryManager() *LogBackupTableHistoryManager {
return &LogBackupTableHistoryManager{
tableNameHistory: make(map[int64][2]DBIDAndTableName),
tableNameHistory: make(map[int64][2]TableLocationInfo),
dbIdToName: make(map[int64]string),
}
}

// AddTableHistory adds or updates history for a regular table
func (info *LogBackupTableHistoryManager) AddTableHistory(tableId int64, tableName string, dbID int64) {
tableLocationInfo := DBIDAndTableName{
DbID: dbID,
TableName: tableName,
locationInfo := TableLocationInfo{
DbID: dbID,
TableName: tableName,
ParentTableID: defaultParentTableID,
}
info.addHistory(tableId, locationInfo)
}

// AddPartitionHistory adds or updates history for a partition
func (info *LogBackupTableHistoryManager) AddPartitionHistory(
partitionId int64, dbID int64, parentTableID int64, parentTableName string) {
locationInfo := TableLocationInfo{
DbID: dbID,
TableName: parentTableName,
ParentTableID: parentTableID,
}
names, exists := info.tableNameHistory[tableId]
info.addHistory(partitionId, locationInfo)
}

// addHistory is a helper method to maintain the history
func (info *LogBackupTableHistoryManager) addHistory(id int64, locationInfo TableLocationInfo) {
existing, exists := info.tableNameHistory[id]
if !exists {
// first occurrence - store as original name
info.tableNameHistory[tableId] = [2]DBIDAndTableName{tableLocationInfo, tableLocationInfo}
// first occurrence - store as both original and current
info.tableNameHistory[id] = [2]TableLocationInfo{locationInfo, locationInfo}
} else {
// update current name while preserving original name
info.tableNameHistory[tableId] = [2]DBIDAndTableName{names[0], tableLocationInfo}
// update current while preserving original
info.tableNameHistory[id] = [2]TableLocationInfo{existing[0], locationInfo}
}
}

Expand All @@ -57,7 +79,7 @@ func (info *LogBackupTableHistoryManager) RecordDBIdToName(dbId int64, dbName st

// GetTableHistory returns information about all tables that have been renamed.
// Returns a map of table IDs to their original and current locations
func (info *LogBackupTableHistoryManager) GetTableHistory() map[int64][2]DBIDAndTableName {
func (info *LogBackupTableHistoryManager) GetTableHistory() map[int64][2]TableLocationInfo {
return info.tableNameHistory
}

Expand Down
8 changes: 4 additions & 4 deletions br/pkg/stream/table_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (tm *TableMappingManager) MergeBaseDBReplace(baseMap map[UpstreamID]*DBRepl
if _, exists := existingDBReplace.TableMap[tableUpID]; !exists {
existingDBReplace.TableMap[tableUpID] = baseTableReplace
} else {
// Merge partition mappings for existing tables
// merge partition mappings for existing tables
existingTableReplace := existingDBReplace.TableMap[tableUpID]
for partUpID, partDownID := range baseTableReplace.PartitionMap {
if _, exists := existingTableReplace.PartitionMap[partUpID]; !exists {
Expand Down Expand Up @@ -293,18 +293,18 @@ func (tm *TableMappingManager) ReplaceTemporaryIDs(
return nil
}

func (tm *TableMappingManager) FilterDBReplaceMap(tracker *utils.PiTRTableTracker) {
func (tm *TableMappingManager) FilterDBReplaceMap(tracker *utils.PiTRIdTracker) {
// iterate through existing DBReplaceMap
for dbID, dbReplace := range tm.DBReplaceMap {
// remove entire database if not in filter
// remove entire database if not in tracker
if !tracker.ContainsDB(dbID) {
delete(tm.DBReplaceMap, dbID)
continue
}

// filter tables in this database
for tableID := range dbReplace.TableMap {
if !tracker.ContainsTable(dbID, tableID) {
if !tracker.ContainsPhysicalId(dbID, tableID) {
delete(dbReplace.TableMap, tableID)
}
}
Expand Down
22 changes: 11 additions & 11 deletions br/pkg/stream/table_mapping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func TestFilterDBReplaceMap(t *testing.T) {
tests := []struct {
name string
initial map[UpstreamID]*DBReplace
filter *utils.PiTRTableTracker
filter *utils.PiTRIdTracker
expected map[UpstreamID]*DBReplace
}{
{
Expand All @@ -378,8 +378,8 @@ func TestFilterDBReplaceMap(t *testing.T) {
},
},
},
filter: &utils.PiTRTableTracker{
DBIdToTable: map[int64]map[int64]struct{}{},
filter: &utils.PiTRIdTracker{
DBIdToPhysicalId: map[int64]map[int64]struct{}{},
},
expected: map[UpstreamID]*DBReplace{},
},
Expand All @@ -401,8 +401,8 @@ func TestFilterDBReplaceMap(t *testing.T) {
},
},
},
filter: &utils.PiTRTableTracker{
DBIdToTable: map[int64]map[int64]struct{}{
filter: &utils.PiTRIdTracker{
DBIdToPhysicalId: map[int64]map[int64]struct{}{
1: {10: struct{}{}},
},
},
Expand All @@ -429,8 +429,8 @@ func TestFilterDBReplaceMap(t *testing.T) {
},
},
},
filter: &utils.PiTRTableTracker{
DBIdToTable: map[int64]map[int64]struct{}{
filter: &utils.PiTRIdTracker{
DBIdToPhysicalId: map[int64]map[int64]struct{}{
1: {
10: struct{}{},
12: struct{}{},
Expand Down Expand Up @@ -474,8 +474,8 @@ func TestFilterDBReplaceMap(t *testing.T) {
},
},
},
filter: &utils.PiTRTableTracker{
DBIdToTable: map[int64]map[int64]struct{}{
filter: &utils.PiTRIdTracker{
DBIdToPhysicalId: map[int64]map[int64]struct{}{
1: {10: struct{}{}},
},
},
Expand Down Expand Up @@ -523,8 +523,8 @@ func TestFilterDBReplaceMap(t *testing.T) {
},
},
},
filter: &utils.PiTRTableTracker{
DBIdToTable: map[int64]map[int64]struct{}{
filter: &utils.PiTRIdTracker{
DBIdToPhysicalId: map[int64]map[int64]struct{}{
1: {10: struct{}{}},
2: {
20: struct{}{},
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,8 @@ type Config struct {
TableFilter filter.Filter `json:"-" toml:"-"`
// PiTRTableTracker generated from TableFilter during snapshot restore, it has all the db id and table id that needs
// to be restored
PiTRTableTracker *utils.PiTRTableTracker `json:"-" toml:"-"`
SwitchModeInterval time.Duration `json:"switch-mode-interval" toml:"switch-mode-interval"`
PiTRTableTracker *utils.PiTRIdTracker `json:"-" toml:"-"`
SwitchModeInterval time.Duration `json:"switch-mode-interval" toml:"switch-mode-interval"`
// Schemas is a database name set, to check whether the restore database has been backup
Schemas map[string]struct{}
// Tables is a table name set, to check whether the restore table has been backup
Expand Down
Loading

0 comments on commit f6d3330

Please sign in to comment.