Skip to content

Commit

Permalink
Merge pull request #688 from zhyass/feature_set
Browse files Browse the repository at this point in the history
*: fix the bug loadbalance does not take effect in streaming fetch st…
  • Loading branch information
BohuTANG authored Sep 21, 2020
2 parents b6006f3 + 50bf249 commit 84715fb
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 4 deletions.
35 changes: 31 additions & 4 deletions src/backend/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,13 +284,13 @@ func TestTxnExecuteReplicaError(t *testing.T) {
func TestTxnExecuteStreamFetch(t *testing.T) {
defer leaktest.Check(t)()
log := xlog.NewStdLog(xlog.Level(xlog.PANIC))
fakedb, txnMgr, backends, addrs, cleanup := MockTxnMgr(log, 2)
fakedb, txnMgr, backends, addrs, cleanup := MockTxnMgrWithReplica(log, 2)
defer cleanup()

querys := []xcontext.QueryTuple{
xcontext.QueryTuple{Query: "select * from node1", Backend: addrs[0]},
xcontext.QueryTuple{Query: "select * from node2", Backend: addrs[1]},
xcontext.QueryTuple{Query: "select * from node3", Backend: addrs[1]},
{Query: "select * from node1", Backend: addrs[0]},
{Query: "select * from node2", Backend: addrs[1]},
{Query: "select * from node3", Backend: addrs[1]},
}

result11 := &sqltypes.Result{
Expand Down Expand Up @@ -360,6 +360,33 @@ func TestTxnExecuteStreamFetch(t *testing.T) {
assert.Equal(t, want, got)
}

// loadbalance=1.
{
fakedb.AddQueryStream(querys[0].Query, result11)
fakedb.AddQueryStream(querys[1].Query, result12)
fakedb.AddQueryStream(querys[2].Query, result12)

txn, err := txnMgr.CreateTxn(backends)
assert.Nil(t, err)
defer txn.Finish()
txn.SetIsExecOnRep(true)

rctx := &xcontext.RequestContext{
Querys: querys,
}

callbackQr := &sqltypes.Result{}
err = txn.ExecuteStreamFetch(rctx, func(qr *sqltypes.Result) error {
callbackQr.AppendResult(qr)
return nil
}, 1024*1024)
assert.Nil(t, err)

want := len(result11.Rows) + 2*len(result12.Rows)
got := len(callbackQr.Rows)
assert.Equal(t, want, got)
}

// execute error.
{
fakedb.AddQueryError(querys[0].Query, errors.New("mock.stream.query.error"))
Expand Down
3 changes: 3 additions & 0 deletions src/proxy/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ func (spanner *Spanner) executeWithTimeout(session *driver.Session, database str
// ExecuteStreamFetch used to execute a stream fetch query.
func (spanner *Spanner) ExecuteStreamFetch(session *driver.Session, database string, query string, node sqlparser.Statement, callback func(qr *sqltypes.Result) error) error {
log := spanner.log
conf := spanner.conf
router := spanner.router
scatter := spanner.scatter
sessions := spanner.sessions
Expand All @@ -178,6 +179,8 @@ func (spanner *Spanner) ExecuteStreamFetch(session *driver.Session, database str
}
defer txn.Finish()

txn.SetIsExecOnRep(conf.Proxy.LoadBalance != 0)

// binding.
sessions.TxnBinding(session, txn, node, query)
defer sessions.TxnUnBinding(session)
Expand Down

0 comments on commit 84715fb

Please sign in to comment.