From 84b1a7df792bdb448783a8d634c444ab45bb1895 Mon Sep 17 00:00:00 2001 From: "yulai.li" Date: Thu, 24 Dec 2020 12:23:23 +0800 Subject: [PATCH 1/3] txn: update table delta map when really lock keys --- executor/batch_point_get.go | 10 +++ executor/executor.go | 24 +++++- executor/executor_test.go | 149 ++++++++++++++++++++++++++++++++++++ executor/point_get.go | 9 +++ session/session.go | 7 +- 5 files changed, 196 insertions(+), 3 deletions(-) diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index 165cb5bb078b3..1e7975c9d8d20 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -292,6 +292,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { existKeys = make([]kv.Key, 0, len(values)) } e.values = make([][]byte, 0, len(values)) + hasKeys := false for i, key := range keys { val := values[string(key)] if len(val) == 0 { @@ -304,6 +305,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { e.values = append(e.values, val) handles = append(handles, e.handles[i]) if e.lock && rc { + hasKeys = true existKeys = append(existKeys, key) } } @@ -313,6 +315,14 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { if err != nil { return err } + if hasKeys { + // Update partition table IDs + for _, pid := range e.physIDs { + e.updateDeltaForTableID(pid) + } + // Update table ID + e.updateDeltaForTableID(e.tblInfo.ID) + } } e.handles = handles return nil diff --git a/executor/executor.go b/executor/executor.go index 8109ffcc16a15..ee20f9c2b91d9 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -192,6 +192,11 @@ func (e *baseExecutor) Next(ctx context.Context, req *chunk.Chunk) error { return nil } +func (e *baseExecutor) updateDeltaForTableID(id int64) { + txnCtx := e.ctx.GetSessionVars().TxnCtx + txnCtx.UpdateDeltaForTable(id, 0, 0, map[int64]int64{}) +} + func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id int, children ...Executor) baseExecutor { e := baseExecutor{ children: children, @@ -900,6 +905,7 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { return nil } + hasKeys := false if req.NumRows() > 0 { iter := chunk.NewIterator4Chunk(req) for row := iter.Begin(); row != iter.End(); row = iter.Next() { @@ -915,6 +921,7 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { } for _, col := range cols { + hasKeys = true e.keys = append(e.keys, tablecodec.EncodeRowKeyWithHandle(physicalID, row.GetInt64(col.Index))) } } @@ -926,7 +933,22 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { lockWaitTime = kv.LockNoWait } - return doLockKeys(ctx, e.ctx, newLockCtx(e.ctx.GetSessionVars(), lockWaitTime), e.keys...) + err = doLockKeys(ctx, e.ctx, newLockCtx(e.ctx.GetSessionVars(), lockWaitTime), e.keys...) + if err == nil && hasKeys { + // Just update table delta when there really has keys locked. + if len(e.tblID2Handle) > 0 { + for id := range e.tblID2Handle { + e.updateDeltaForTableID(id) + } + } + if len(e.partitionedTable) > 0 { + for _, p := range e.partitionedTable { + pid := p.Meta().ID + e.updateDeltaForTableID(pid) + } + } + } + return err } func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *kv.LockCtx { diff --git a/executor/executor_test.go b/executor/executor_test.go index eaf38fc8010fd..1506481a9cf71 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -6508,3 +6508,152 @@ func (s *testSuite) TestTxnRetry(c *C) { tk.MustExec("commit") tk.MustQuery("select * from t").Check(testkit.Rows("10")) } + +func issue20975Prepare(c *C, store kv.Storage) (*testkit.TestKit, *testkit.TestKit) { + tk1 := testkit.NewTestKit(c, store) + tk2 := testkit.NewTestKit(c, store) + tk1.MustExec("use test") + tk1.MustExec("drop table if exists t1, t2") + tk2.MustExec("use test") + tk1.MustExec("create table t1(id int primary key, c int)") + tk1.MustExec("insert into t1 values(1, 10), (2, 20)") + return tk1, tk2 +} + +func (s *testSuite) TestIssue20975UpdateNoChange(c *C) { + tk1, tk2 := issue20975Prepare(c, s.store) + tk1.MustExec("begin pessimistic") + tk1.MustExec("update t1 set c=c") + tk2.MustExec("create table t2(a int)") + tk1.MustExec("commit") +} + +func (s *testSuite) TestIssue20975SelectForUpdate(c *C) { + tk1, tk2 := issue20975Prepare(c, s.store) + tk1.MustExec("begin") + tk1.MustExec("select * from t1 for update") + tk2.MustExec("create table t2(a int)") + tk1.MustExec("commit") + + tk1.MustExec("begin pessimistic") + tk1.MustExec("select * from t1 for update") + tk2.MustExec("drop table t2") + tk1.MustExec("commit") +} + +func (s *testSuite) TestIssue20975SelectForUpdatePointGet(c *C) { + tk1, tk2 := issue20975Prepare(c, s.store) + tk1.MustExec("begin") + tk1.MustExec("select * from t1 where id=1 for update") + tk2.MustExec("create table t2(a int)") + tk1.MustExec("commit") + + tk1.MustExec("begin pessimistic") + tk1.MustExec("select * from t1 where id=1 for update") + tk2.MustExec("drop table t2") + tk1.MustExec("commit") +} + +func (s *testSuite) TestIssue20975SelectForUpdateBatchPointGet(c *C) { + tk1, tk2 := issue20975Prepare(c, s.store) + tk1.MustExec("begin") + tk1.MustExec("select * from t1 where id in (1, 2) for update") + tk2.MustExec("create table t2(a int)") + tk1.MustExec("commit") + + tk1.MustExec("begin pessimistic") + tk1.MustExec("select * from t1 where id in (1, 2) for update") + tk2.MustExec("drop table t2") + tk1.MustExec("commit") +} + +func issue20975PreparePartitionTable(c *C, store kv.Storage) (*testkit.TestKit, *testkit.TestKit) { + tk1 := testkit.NewTestKit(c, store) + tk2 := testkit.NewTestKit(c, store) + tk1.MustExec("use test") + tk1.MustExec("drop table if exists t1, t2") + tk2.MustExec("use test") + tk1.MustExec(`create table t1(id int primary key, c int) partition by range (id) ( + partition p1 values less than (10), + partition p2 values less than (20) + )`) + tk1.MustExec("insert into t1 values(1, 10), (2, 20), (11, 30), (12, 40)") + return tk1, tk2 +} + +func (s *testSuite) TestIssue20975UpdateNoChangeWithPartitionTable(c *C) { + tk1, tk2 := issue20975PreparePartitionTable(c, s.store) + tk1.MustExec("begin pessimistic") + tk1.MustExec("update t1 set c=c") + tk2.MustExec("create table t2(a int)") + tk1.MustExec("commit") +} + +func (s *testSuite) TestIssue20975SelectForUpdateWithPartitionTable(c *C) { + tk1, tk2 := issue20975PreparePartitionTable(c, s.store) + tk1.MustExec("begin") + tk1.MustExec("select * from t1 for update") + tk2.MustExec("create table t2(a int)") + tk1.MustExec("commit") + + tk1.MustExec("begin pessimistic") + tk1.MustExec("select * from t1 for update") + tk2.MustExec("drop table t2") + tk1.MustExec("commit") +} + +func (s *testSuite) TestIssue20975SelectForUpdatePointGetWithPartitionTable(c *C) { + tk1, tk2 := issue20975PreparePartitionTable(c, s.store) + tk1.MustExec("begin") + tk1.MustExec("select * from t1 where id=1 for update") + tk2.MustExec("create table t2(a int)") + tk1.MustExec("commit") + + tk1.MustExec("begin") + tk1.MustExec("select * from t1 where id=12 for update") + tk2.MustExec("drop table t2") + tk1.MustExec("commit") + + tk1.MustExec("begin pessimistic") + tk1.MustExec("select * from t1 where id=1 for update") + tk2.MustExec("create table t2(a int)") + tk1.MustExec("commit") + + tk1.MustExec("begin pessimistic") + tk1.MustExec("select * from t1 where id=12 for update") + tk2.MustExec("drop table t2") + tk1.MustExec("commit") +} + +func (s *testSuite) TestIssue20975SelectForUpdateBatchPointGetWithPartitionTable(c *C) { + tk1, tk2 := issue20975PreparePartitionTable(c, s.store) + tk1.MustExec("begin") + tk1.MustExec("select * from t1 where id in (1, 2) for update") + tk2.MustExec("create table t2(a int)") + tk1.MustExec("commit") + + tk1.MustExec("begin") + tk1.MustExec("select * from t1 where id in (11, 12) for update") + tk2.MustExec("drop table t2") + tk1.MustExec("commit") + + tk1.MustExec("begin") + tk1.MustExec("select * from t1 where id in (1, 11) for update") + tk2.MustExec("create table t2(a int)") + tk1.MustExec("commit") + + tk1.MustExec("begin pessimistic") + tk1.MustExec("select * from t1 where id in (1, 2) for update") + tk2.MustExec("drop table t2") + tk1.MustExec("commit") + + tk1.MustExec("begin pessimistic") + tk1.MustExec("select * from t1 where id in (11, 12) for update") + tk2.MustExec("create table t2(a int)") + tk1.MustExec("commit") + + tk1.MustExec("begin pessimistic") + tk1.MustExec("select * from t1 where id in (1, 11) for update") + tk2.MustExec("drop table t2") + tk1.MustExec("commit") +} diff --git a/executor/point_get.go b/executor/point_get.go index d1d0b8e11c574..2f1cff87f9a4f 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -276,6 +276,15 @@ func (e *PointGetExecutor) lockKeyIfNeeded(ctx context.Context, key []byte) erro if err != nil { return err } + // Key need lock get table ID + var tblID int64 + if e.partInfo != nil { + tblID = e.partInfo.ID + } else { + tblID = e.tblInfo.ID + } + e.updateDeltaForTableID(tblID) + lockCtx.ValuesLock.Lock() defer lockCtx.ValuesLock.Unlock() for key, val := range lockCtx.Values { diff --git a/session/session.go b/session/session.go index 798a08a125148..e30341b5f6c4e 100644 --- a/session/session.go +++ b/session/session.go @@ -437,8 +437,11 @@ func (s *session) doCommit(ctx context.Context) error { for id := range relatedPhysicalTables { physicalTableIDs = append(physicalTableIDs, id) } - // Set this option for 2 phase commit to validate schema lease. - s.txn.SetOption(kv.SchemaChecker, domain.NewSchemaChecker(domain.GetDomain(s), s.sessionVars.TxnCtx.SchemaVersion, physicalTableIDs)) + // If table delta is empty means no changes for any table. So do not need SchemaChecker. + if len(physicalTableIDs) > 0 { + // Set this option for 2 phase commit to validate schema lease. + s.txn.SetOption(kv.SchemaChecker, domain.NewSchemaChecker(domain.GetDomain(s), s.sessionVars.TxnCtx.SchemaVersion, physicalTableIDs)) + } s.txn.SetOption(kv.InfoSchema, s.sessionVars.TxnCtx.InfoSchema) s.txn.SetOption(kv.CommitHook, func(info kv.TxnInfo, _ error) { s.sessionVars.LastTxnInfo = info }) if s.GetSessionVars().EnableAmendPessimisticTxn { From 301cb5430511d27b337e35aa110bd29928bd85e1 Mon Sep 17 00:00:00 2001 From: "yulai.li" Date: Thu, 24 Dec 2020 12:42:28 +0800 Subject: [PATCH 2/3] fix for lint --- executor/batch_point_get.go | 4 +++- executor/executor.go | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index 1e7975c9d8d20..48fee26f266d0 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -305,7 +305,9 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { e.values = append(e.values, val) handles = append(handles, e.handles[i]) if e.lock && rc { - hasKeys = true + if !hasKeys { + hasKeys = true + } existKeys = append(existKeys, key) } } diff --git a/executor/executor.go b/executor/executor.go index ee20f9c2b91d9..95b1488d606d9 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -921,7 +921,9 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { } for _, col := range cols { - hasKeys = true + if !hasKeys { + hasKeys = true + } e.keys = append(e.keys, tablecodec.EncodeRowKeyWithHandle(physicalID, row.GetInt64(col.Index))) } } From f27972780d4773a9ab176d2b4c79b18906b3b3a6 Mon Sep 17 00:00:00 2001 From: "yulai.li" Date: Thu, 24 Dec 2020 14:13:18 +0800 Subject: [PATCH 3/3] Fix reviews --- executor/batch_point_get.go | 15 ++++++++++----- session/session.go | 7 ++----- sessionctx/variable/session.go | 9 +++++++++ 3 files changed, 21 insertions(+), 10 deletions(-) diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index 48fee26f266d0..313d78d3c7fea 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -264,6 +264,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { keys[i] = key } + hasKeys := false var values map[string][]byte rc := e.ctx.GetSessionVars().IsPessimisticReadConsistency() // Lock keys (include exists and non-exists keys) before fetch all values for Repeatable Read Isolation. @@ -273,6 +274,9 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { for _, idxKey := range indexKeys { // lock the non-exist index key, using len(val) in case BatchGet result contains some zero len entries if val := handleVals[string(idxKey)]; len(val) == 0 { + if !hasKeys { + hasKeys = true + } lockKeys = append(lockKeys, idxKey) } } @@ -292,7 +296,6 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { existKeys = make([]kv.Key, 0, len(values)) } e.values = make([][]byte, 0, len(values)) - hasKeys := false for i, key := range keys { val := values[string(key)] if len(val) == 0 { @@ -312,10 +315,12 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { } } // Lock exists keys only for Read Committed Isolation. - if e.lock && rc { - err = e.lockKeys(ctx, existKeys) - if err != nil { - return err + if e.lock { + if rc { + err = e.lockKeys(ctx, existKeys) + if err != nil { + return err + } } if hasKeys { // Update partition table IDs diff --git a/session/session.go b/session/session.go index e30341b5f6c4e..798a08a125148 100644 --- a/session/session.go +++ b/session/session.go @@ -437,11 +437,8 @@ func (s *session) doCommit(ctx context.Context) error { for id := range relatedPhysicalTables { physicalTableIDs = append(physicalTableIDs, id) } - // If table delta is empty means no changes for any table. So do not need SchemaChecker. - if len(physicalTableIDs) > 0 { - // Set this option for 2 phase commit to validate schema lease. - s.txn.SetOption(kv.SchemaChecker, domain.NewSchemaChecker(domain.GetDomain(s), s.sessionVars.TxnCtx.SchemaVersion, physicalTableIDs)) - } + // Set this option for 2 phase commit to validate schema lease. + s.txn.SetOption(kv.SchemaChecker, domain.NewSchemaChecker(domain.GetDomain(s), s.sessionVars.TxnCtx.SchemaVersion, physicalTableIDs)) s.txn.SetOption(kv.InfoSchema, s.sessionVars.TxnCtx.InfoSchema) s.txn.SetOption(kv.CommitHook, func(info kv.TxnInfo, _ error) { s.sessionVars.LastTxnInfo = info }) if s.GetSessionVars().EnableAmendPessimisticTxn { diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 05de9ef3b8d56..49245ddcd6d4f 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -158,6 +158,9 @@ type TransactionContext struct { Isolation string LockExpire uint32 ForUpdate uint32 + + // TableDeltaMap lock to prevent potential data race + tdmLock sync.Mutex } // AddUnchangedRowKey adds an unchanged row key in update statement for pessimistic lock. @@ -179,6 +182,8 @@ func (tc *TransactionContext) CollectUnchangedRowKeys(buf []kv.Key) []kv.Key { // UpdateDeltaForTable updates the delta info for some table. func (tc *TransactionContext) UpdateDeltaForTable(physicalTableID int64, delta int64, count int64, colSize map[int64]int64) { + tc.tdmLock.Lock() + defer tc.tdmLock.Unlock() if tc.TableDeltaMap == nil { tc.TableDeltaMap = make(map[int64]TableDelta) } @@ -217,13 +222,17 @@ func (tc *TransactionContext) Cleanup() { tc.DirtyDB = nil tc.Binlog = nil tc.History = nil + tc.tdmLock.Lock() tc.TableDeltaMap = nil + tc.tdmLock.Unlock() tc.pessimisticLockCache = nil } // ClearDelta clears the delta map. func (tc *TransactionContext) ClearDelta() { + tc.tdmLock.Lock() tc.TableDeltaMap = nil + tc.tdmLock.Unlock() } // GetForUpdateTS returns the ts for update.