diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index 165cb5bb078b3..313d78d3c7fea 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -264,6 +264,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { keys[i] = key } + hasKeys := false var values map[string][]byte rc := e.ctx.GetSessionVars().IsPessimisticReadConsistency() // Lock keys (include exists and non-exists keys) before fetch all values for Repeatable Read Isolation. @@ -273,6 +274,9 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { for _, idxKey := range indexKeys { // lock the non-exist index key, using len(val) in case BatchGet result contains some zero len entries if val := handleVals[string(idxKey)]; len(val) == 0 { + if !hasKeys { + hasKeys = true + } lockKeys = append(lockKeys, idxKey) } } @@ -304,14 +308,27 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { e.values = append(e.values, val) handles = append(handles, e.handles[i]) if e.lock && rc { + if !hasKeys { + hasKeys = true + } existKeys = append(existKeys, key) } } // Lock exists keys only for Read Committed Isolation. - if e.lock && rc { - err = e.lockKeys(ctx, existKeys) - if err != nil { - return err + if e.lock { + if rc { + err = e.lockKeys(ctx, existKeys) + if err != nil { + return err + } + } + if hasKeys { + // Update partition table IDs + for _, pid := range e.physIDs { + e.updateDeltaForTableID(pid) + } + // Update table ID + e.updateDeltaForTableID(e.tblInfo.ID) } } e.handles = handles diff --git a/executor/executor.go b/executor/executor.go index 8109ffcc16a15..95b1488d606d9 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -192,6 +192,11 @@ func (e *baseExecutor) Next(ctx context.Context, req *chunk.Chunk) error { return nil } +func (e *baseExecutor) updateDeltaForTableID(id int64) { + txnCtx := e.ctx.GetSessionVars().TxnCtx + txnCtx.UpdateDeltaForTable(id, 0, 0, map[int64]int64{}) +} + func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id int, children ...Executor) baseExecutor { e := baseExecutor{ children: children, @@ -900,6 +905,7 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { return nil } + hasKeys := false if req.NumRows() > 0 { iter := chunk.NewIterator4Chunk(req) for row := iter.Begin(); row != iter.End(); row = iter.Next() { @@ -915,6 +921,9 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { } for _, col := range cols { + if !hasKeys { + hasKeys = true + } e.keys = append(e.keys, tablecodec.EncodeRowKeyWithHandle(physicalID, row.GetInt64(col.Index))) } } @@ -926,7 +935,22 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { lockWaitTime = kv.LockNoWait } - return doLockKeys(ctx, e.ctx, newLockCtx(e.ctx.GetSessionVars(), lockWaitTime), e.keys...) + err = doLockKeys(ctx, e.ctx, newLockCtx(e.ctx.GetSessionVars(), lockWaitTime), e.keys...) + if err == nil && hasKeys { + // Just update table delta when there really has keys locked. + if len(e.tblID2Handle) > 0 { + for id := range e.tblID2Handle { + e.updateDeltaForTableID(id) + } + } + if len(e.partitionedTable) > 0 { + for _, p := range e.partitionedTable { + pid := p.Meta().ID + e.updateDeltaForTableID(pid) + } + } + } + return err } func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *kv.LockCtx { diff --git a/executor/executor_test.go b/executor/executor_test.go index eaf38fc8010fd..1506481a9cf71 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -6508,3 +6508,152 @@ func (s *testSuite) TestTxnRetry(c *C) { tk.MustExec("commit") tk.MustQuery("select * from t").Check(testkit.Rows("10")) } + +func issue20975Prepare(c *C, store kv.Storage) (*testkit.TestKit, *testkit.TestKit) { + tk1 := testkit.NewTestKit(c, store) + tk2 := testkit.NewTestKit(c, store) + tk1.MustExec("use test") + tk1.MustExec("drop table if exists t1, t2") + tk2.MustExec("use test") + tk1.MustExec("create table t1(id int primary key, c int)") + tk1.MustExec("insert into t1 values(1, 10), (2, 20)") + return tk1, tk2 +} + +func (s *testSuite) TestIssue20975UpdateNoChange(c *C) { + tk1, tk2 := issue20975Prepare(c, s.store) + tk1.MustExec("begin pessimistic") + tk1.MustExec("update t1 set c=c") + tk2.MustExec("create table t2(a int)") + tk1.MustExec("commit") +} + +func (s *testSuite) TestIssue20975SelectForUpdate(c *C) { + tk1, tk2 := issue20975Prepare(c, s.store) + tk1.MustExec("begin") + tk1.MustExec("select * from t1 for update") + tk2.MustExec("create table t2(a int)") + tk1.MustExec("commit") + + tk1.MustExec("begin pessimistic") + tk1.MustExec("select * from t1 for update") + tk2.MustExec("drop table t2") + tk1.MustExec("commit") +} + +func (s *testSuite) TestIssue20975SelectForUpdatePointGet(c *C) { + tk1, tk2 := issue20975Prepare(c, s.store) + tk1.MustExec("begin") + tk1.MustExec("select * from t1 where id=1 for update") + tk2.MustExec("create table t2(a int)") + tk1.MustExec("commit") + + tk1.MustExec("begin pessimistic") + tk1.MustExec("select * from t1 where id=1 for update") + tk2.MustExec("drop table t2") + tk1.MustExec("commit") +} + +func (s *testSuite) TestIssue20975SelectForUpdateBatchPointGet(c *C) { + tk1, tk2 := issue20975Prepare(c, s.store) + tk1.MustExec("begin") + tk1.MustExec("select * from t1 where id in (1, 2) for update") + tk2.MustExec("create table t2(a int)") + tk1.MustExec("commit") + + tk1.MustExec("begin pessimistic") + tk1.MustExec("select * from t1 where id in (1, 2) for update") + tk2.MustExec("drop table t2") + tk1.MustExec("commit") +} + +func issue20975PreparePartitionTable(c *C, store kv.Storage) (*testkit.TestKit, *testkit.TestKit) { + tk1 := testkit.NewTestKit(c, store) + tk2 := testkit.NewTestKit(c, store) + tk1.MustExec("use test") + tk1.MustExec("drop table if exists t1, t2") + tk2.MustExec("use test") + tk1.MustExec(`create table t1(id int primary key, c int) partition by range (id) ( + partition p1 values less than (10), + partition p2 values less than (20) + )`) + tk1.MustExec("insert into t1 values(1, 10), (2, 20), (11, 30), (12, 40)") + return tk1, tk2 +} + +func (s *testSuite) TestIssue20975UpdateNoChangeWithPartitionTable(c *C) { + tk1, tk2 := issue20975PreparePartitionTable(c, s.store) + tk1.MustExec("begin pessimistic") + tk1.MustExec("update t1 set c=c") + tk2.MustExec("create table t2(a int)") + tk1.MustExec("commit") +} + +func (s *testSuite) TestIssue20975SelectForUpdateWithPartitionTable(c *C) { + tk1, tk2 := issue20975PreparePartitionTable(c, s.store) + tk1.MustExec("begin") + tk1.MustExec("select * from t1 for update") + tk2.MustExec("create table t2(a int)") + tk1.MustExec("commit") + + tk1.MustExec("begin pessimistic") + tk1.MustExec("select * from t1 for update") + tk2.MustExec("drop table t2") + tk1.MustExec("commit") +} + +func (s *testSuite) TestIssue20975SelectForUpdatePointGetWithPartitionTable(c *C) { + tk1, tk2 := issue20975PreparePartitionTable(c, s.store) + tk1.MustExec("begin") + tk1.MustExec("select * from t1 where id=1 for update") + tk2.MustExec("create table t2(a int)") + tk1.MustExec("commit") + + tk1.MustExec("begin") + tk1.MustExec("select * from t1 where id=12 for update") + tk2.MustExec("drop table t2") + tk1.MustExec("commit") + + tk1.MustExec("begin pessimistic") + tk1.MustExec("select * from t1 where id=1 for update") + tk2.MustExec("create table t2(a int)") + tk1.MustExec("commit") + + tk1.MustExec("begin pessimistic") + tk1.MustExec("select * from t1 where id=12 for update") + tk2.MustExec("drop table t2") + tk1.MustExec("commit") +} + +func (s *testSuite) TestIssue20975SelectForUpdateBatchPointGetWithPartitionTable(c *C) { + tk1, tk2 := issue20975PreparePartitionTable(c, s.store) + tk1.MustExec("begin") + tk1.MustExec("select * from t1 where id in (1, 2) for update") + tk2.MustExec("create table t2(a int)") + tk1.MustExec("commit") + + tk1.MustExec("begin") + tk1.MustExec("select * from t1 where id in (11, 12) for update") + tk2.MustExec("drop table t2") + tk1.MustExec("commit") + + tk1.MustExec("begin") + tk1.MustExec("select * from t1 where id in (1, 11) for update") + tk2.MustExec("create table t2(a int)") + tk1.MustExec("commit") + + tk1.MustExec("begin pessimistic") + tk1.MustExec("select * from t1 where id in (1, 2) for update") + tk2.MustExec("drop table t2") + tk1.MustExec("commit") + + tk1.MustExec("begin pessimistic") + tk1.MustExec("select * from t1 where id in (11, 12) for update") + tk2.MustExec("create table t2(a int)") + tk1.MustExec("commit") + + tk1.MustExec("begin pessimistic") + tk1.MustExec("select * from t1 where id in (1, 11) for update") + tk2.MustExec("drop table t2") + tk1.MustExec("commit") +} diff --git a/executor/point_get.go b/executor/point_get.go index d1d0b8e11c574..2f1cff87f9a4f 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -276,6 +276,15 @@ func (e *PointGetExecutor) lockKeyIfNeeded(ctx context.Context, key []byte) erro if err != nil { return err } + // Key need lock get table ID + var tblID int64 + if e.partInfo != nil { + tblID = e.partInfo.ID + } else { + tblID = e.tblInfo.ID + } + e.updateDeltaForTableID(tblID) + lockCtx.ValuesLock.Lock() defer lockCtx.ValuesLock.Unlock() for key, val := range lockCtx.Values { diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 05de9ef3b8d56..49245ddcd6d4f 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -158,6 +158,9 @@ type TransactionContext struct { Isolation string LockExpire uint32 ForUpdate uint32 + + // TableDeltaMap lock to prevent potential data race + tdmLock sync.Mutex } // AddUnchangedRowKey adds an unchanged row key in update statement for pessimistic lock. @@ -179,6 +182,8 @@ func (tc *TransactionContext) CollectUnchangedRowKeys(buf []kv.Key) []kv.Key { // UpdateDeltaForTable updates the delta info for some table. func (tc *TransactionContext) UpdateDeltaForTable(physicalTableID int64, delta int64, count int64, colSize map[int64]int64) { + tc.tdmLock.Lock() + defer tc.tdmLock.Unlock() if tc.TableDeltaMap == nil { tc.TableDeltaMap = make(map[int64]TableDelta) } @@ -217,13 +222,17 @@ func (tc *TransactionContext) Cleanup() { tc.DirtyDB = nil tc.Binlog = nil tc.History = nil + tc.tdmLock.Lock() tc.TableDeltaMap = nil + tc.tdmLock.Unlock() tc.pessimisticLockCache = nil } // ClearDelta clears the delta map. func (tc *TransactionContext) ClearDelta() { + tc.tdmLock.Lock() tc.TableDeltaMap = nil + tc.tdmLock.Unlock() } // GetForUpdateTS returns the ts for update.