Skip to content

Commit

Permalink
cherry-pick
Browse files Browse the repository at this point in the history
  • Loading branch information
wshwsh12 committed Jan 24, 2025
1 parent 5e39597 commit fd7174c
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 3 deletions.
17 changes: 14 additions & 3 deletions executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) {
func (e *IndexNestedLoopHashJoin) finishJoinWorkers(r interface{}) {
if r != nil {
e.IndexLookUpJoin.finished.Store(true)
if e.cancelFunc != nil {
e.cancelFunc()
}
err := fmt.Errorf("%v", r)

if !e.panicErr.Load() {
Expand All @@ -212,9 +215,6 @@ func (e *IndexNestedLoopHashJoin) finishJoinWorkers(r interface{}) {
task := &indexHashJoinTask{err: err}
e.taskCh <- task
}
if e.cancelFunc != nil {
e.cancelFunc()
}
}
e.workerWg.Done()
}
Expand Down Expand Up @@ -367,6 +367,11 @@ func (ow *indexHashJoinOuterWorker) run(ctx context.Context) {
failpoint.Inject("testIndexHashJoinOuterWorkerErr", func() {
err = errors.New("mockIndexHashJoinOuterWorkerErr")
})
failpoint.Inject("testIssue54055_1", func(val failpoint.Value) {
if val.(bool) {
err = errors.New("testIssue54055_1")
}
})
if err != nil {
task = &indexHashJoinTask{err: err}
if ow.keepOuterOrder {
Expand Down Expand Up @@ -670,6 +675,12 @@ func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexH
close(resultCh)
}
}()
failpoint.Inject("testIssue54055_2", func(val failpoint.Value) {
if val.(bool) {
time.Sleep(10 * time.Millisecond)
panic("testIssue54055_2")
}
})
var joinStartTime time.Time
if iw.stats != nil {
start := time.Now()
Expand Down
27 changes: 27 additions & 0 deletions executor/index_lookup_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,3 +561,30 @@ func TestIssue54688(t *testing.T) {
rs.Close()
}
}

func TestIssue54055(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
tk.MustExec("drop table if exists t, s;")
tk.MustExec("create table t(a int, index(a));")
tk.MustExec("create table s(a int, index(a));")
tk.MustExec("insert into t values(1), (2), (3), (4), (5), (6), (7), (8), (9), (10), (11), (12), (13), (14), (15), (16), (17), (18), (19), (20), (21), (22), (23), (24), (25), (26), (27), (28), (29), (30), (31), (32), (33), (34), (35), (36), (37), (38), (39), (40), (41), (42), (43), (44), (45), (46), (47), (48), (49), (50), (51), (52), (53), (54), (55), (56), (57), (58), (59), (60), (61), (62), (63), (64), (65), (66), (67), (68), (69), (70), (71), (72), (73), (74), (75), (76), (77), (78), (79), (80), (81), (82), (83), (84), (85), (86), (87), (88), (89), (90), (91), (92), (93), (94), (95), (96), (97), (98), (99), (100), (101), (102), (103), (104), (105), (106), (107), (108), (109), (110), (111), (112), (113), (114), (115), (116), (117), (118), (119), (120), (121), (122), (123), (124), (125), (126), (127), (128);")
tk.MustExec("insert into s values(1), (128);")

tk.MustExec("set @@tidb_max_chunk_size=32;")
tk.MustExec("set @@tidb_index_join_batch_size=32;")
tk.MustExec("set @@tidb_index_lookup_join_concurrency=1;")

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIssue54055_1", "2*return(false)->1*return(true)"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIssue54055_2", "return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIssue54055_1"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIssue54055_2"))
}()
rs, err := tk.Exec("select /*+ INL_HASH_JOIN(s) */ * from t join s on t.a=s.a order by t.a;")
require.NoError(t, err)
_, err = session.GetRows4Test(context.Background(), nil, rs)
require.NotNil(t, err)
rs.Close()
}

0 comments on commit fd7174c

Please sign in to comment.