Skip to content

Commit

Permalink
test: Un-comment socket_client_test.gochore: Sync some codes with ori…
Browse files Browse the repository at this point in the history
…ginal cometbft
  • Loading branch information
dudong2 committed Jan 10, 2025
1 parent 544741a commit e77a690
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 55 deletions.
17 changes: 11 additions & 6 deletions abci/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,12 @@ type ReqRes struct {

mtx cmtsync.Mutex

done bool
// callbackInvoked as a variable to track if the callback was already
// invoked during the regular execution of the request. This variable
// allows clients to set the callback simultaneously without potentially
// invoking the callback twice by accident, once when 'SetCallback' is
// called and once during the normal request.
callbackInvoked bool

// A single callback that may be set.
// Immutable once it is set by NewReqRes.
Expand All @@ -86,8 +91,8 @@ func NewReqRes(req *types.Request) *ReqRes {
WaitGroup: waitGroup1(),
Response: nil,

done: false,
cb: nil,
callbackInvoked: false,
cb: nil,
}
}

Expand All @@ -97,7 +102,7 @@ func NewReqRes(req *types.Request) *ReqRes {
func (r *ReqRes) SetCallback(cb func(res *types.Response)) {
r.mtx.Lock()

if r.done {
if r.callbackInvoked {
r.mtx.Unlock()
cb(r.Response)
return
Expand Down Expand Up @@ -131,9 +136,9 @@ func (r *ReqRes) GetCallback() func(*types.Response) {
}

// SetDone marks the ReqRes object as done.
func (r *ReqRes) SetDone() {
func (r *ReqRes) SetInvoked() {
r.mtx.Lock()
r.done = true
r.callbackInvoked = true
r.mtx.Unlock()
}

Expand Down
2 changes: 1 addition & 1 deletion abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (cli *grpcClient) OnStart() error {
cli.mtx.Lock()
defer cli.mtx.Unlock()

reqres.SetDone()
reqres.SetInvoked()
reqres.Done()

// Notify client listener if set
Expand Down
30 changes: 14 additions & 16 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,19 @@ func (app *localClient) SetResponseCallback(cb Callback) {
app.mtx.Unlock()
}

func (app *localClient) callback(req *types.Request, res *types.Response) *ReqRes {
app.Callback(req, res)
return newLocalReqRes(req, res)
}

func newLocalReqRes(req *types.Request, res *types.Response) *ReqRes {
reqRes := NewReqRes(req)
reqRes.Response = res
reqRes.SetInvoked()
reqRes.Done()
return reqRes
}

//-------------------------------------------------------

func (app *localClient) Error() error {
Expand Down Expand Up @@ -184,8 +197,8 @@ func (app *localClient) CheckTxAsync(ctx context.Context, params *types.RequestC
res := types.ToResponseCheckTx(r)
app.Callback(req, res)
reqRes.Response = res
reqRes.SetInvoked()
reqRes.Done()
reqRes.SetDone()

// Notify reqRes listener if set
if cb := reqRes.GetCallback(); cb != nil {
Expand Down Expand Up @@ -233,18 +246,3 @@ func (app *localClient) BeginRecheckTx(ctx context.Context, params *types.Reques
func (app *localClient) EndRecheckTx(ctx context.Context, params *types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error) {
panic("not implemented")
}

//-------------------------------------------------------

func (app *localClient) callback(req *types.Request, res *types.Response) *ReqRes {
app.Callback(req, res)
return newLocalReqRes(req, res)
}

func newLocalReqRes(req *types.Request, res *types.Response) *ReqRes {
reqRes := NewReqRes(req)
reqRes.Response = res
reqRes.Done()
reqRes.SetDone()
return reqRes
}
5 changes: 5 additions & 0 deletions abci/client/socket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ func (cli *socketClient) Error() error {

//----------------------------------------

// SetResponseCallback sets a callback, which will be executed for each
// non-error & non-empty response from the server.
//
// NOTE: callback may get internally generated flush responses.
func (cli *socketClient) SetResponseCallback(resCb Callback) {
cli.resMtx.Lock()
cli.resCb = resCb
Expand Down Expand Up @@ -219,6 +223,7 @@ func (cli *socketClient) didRecvResponse(res *types.Response) error {
}

reqres.Response = res
reqres.SetInvoked()
reqres.Done() // release waiters
cli.reqSent.Remove(next) // pop first item from linked list

Expand Down
56 changes: 28 additions & 28 deletions abci/client/socket_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,34 +166,34 @@ func (slowApp) CheckTx(context.Context, *types.RequestCheckTx) (*types.ResponseC
// set after the client completes the call into the app. Currently this
// test relies on the callback being allowed to be invoked twice if set multiple
// times, once when set early and once when set late.
// func TestCallbackInvokedWhenSetLate(t *testing.T) {
// ctx, cancel := context.WithCancel(context.Background())
// defer cancel()

// wg := &sync.WaitGroup{}
// wg.Add(1)
// app := blockedABCIApplication{
// wg: wg,
// }
// _, c := setupClientServer(t, app)
// reqRes, err := c.CheckTxAsync(ctx, &types.RequestCheckTx{}, nil)
// require.NoError(t, err)

// done := make(chan struct{})
// cb := func(_ *types.Response) {
// close(done)
// }
// reqRes.SetCallback(cb)
// app.wg.Done()
// <-done

// var called bool
// cb = func(_ *types.Response) {
// called = true
// }
// reqRes.SetCallback(cb)
// require.True(t, called)
// }
func TestCallbackInvokedWhenSetLate(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

wg := &sync.WaitGroup{}
wg.Add(1)
app := blockedABCIApplication{
wg: wg,
}
_, c := setupClientServer(t, app)
reqRes, err := c.CheckTxAsync(ctx, &types.RequestCheckTx{})
require.NoError(t, err)

done := make(chan struct{})
cb := func(_ *types.Response) {
close(done)
}
reqRes.SetCallback(cb)
app.wg.Done()
<-done

var called bool
cb = func(_ *types.Response) {
called = true
}
reqRes.SetCallback(cb)
require.True(t, called)
}

type blockedABCIApplication struct {
wg *sync.WaitGroup
Expand Down
8 changes: 4 additions & 4 deletions mempool/clist_mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func TestMempoolUpdateDoesNotPanicWhenApplicationMissedTx(t *testing.T) {
reqRes := newReqRes(tx, abci.CodeTypeOK, abci.CheckTxType_New)
mockClient.On("CheckTxAsync", mock.Anything, mock.Anything).Return(
func(ctx context.Context, req *abci.RequestCheckTx) (*abciclient.ReqRes, error) {
reqRes.SetDone()
reqRes.SetInvoked()
return reqRes, nil
},
)
Expand Down Expand Up @@ -845,7 +845,7 @@ func TestMempoolSyncRecheckTxReturnError(t *testing.T) {
reqRes := newReqRes(tx, abci.CodeTypeOK, abci.CheckTxType_New)
mockClient.On("CheckTxAsync", mock.Anything, mock.Anything).Return(
func(ctx context.Context, req *abci.RequestCheckTx) (*abciclient.ReqRes, error) {
reqRes.SetDone()
reqRes.SetInvoked()
return reqRes, nil
},
).Once()
Expand Down Expand Up @@ -894,7 +894,7 @@ func TestMempoolAsyncRecheckTxReturnError(t *testing.T) {
reqRes := newReqRes(tx, abci.CodeTypeOK, abci.CheckTxType_New)
mockClient.On("CheckTxAsync", mock.Anything, mock.Anything).Return(
func(ctx context.Context, req *abci.RequestCheckTx) (*abciclient.ReqRes, error) {
reqRes.SetDone()
reqRes.SetInvoked()
return reqRes, nil
},
).Once()
Expand All @@ -914,7 +914,7 @@ func TestMempoolAsyncRecheckTxReturnError(t *testing.T) {
mockClient.On("CheckTxAsync", mock.Anything, mock.Anything).Return(
func(ctx context.Context, req *abci.RequestCheckTx) (*abciclient.ReqRes, error) {
reqRes := newReqRes(req.Tx, abci.CodeTypeOK, abci.CheckTxType_Recheck)
reqRes.SetDone()
reqRes.SetInvoked()
return reqRes, nil
},
).Times(4)
Expand Down

0 comments on commit e77a690

Please sign in to comment.