Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: parallel checktx #3

Merged
merged 38 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
31369aa
feat: concurrent checktx
dudong2 Dec 6, 2024
44d9ac2
feat: concurrent rechecktx
dudong2 Dec 9, 2024
93f66c6
feat: checkTxAsyncReactor
dudong2 Dec 9, 2024
3175751
fix: Use Begin/EndRecheckTxSync for app_conn
dudong2 Dec 9, 2024
3099b2b
chore: revise abci.Client, Async() interfaces
dudong2 Dec 10, 2024
fc88b00
test: Fix TestCacheAfterUpdate
dudong2 Dec 11, 2024
ce06f3b
test: Fix TestClientServer
dudong2 Dec 11, 2024
bafab12
test: Fix tests related with Application
dudong2 Dec 11, 2024
2f694a1
test: Fix TestHangingAsyncCalls
dudong2 Dec 12, 2024
70b64d3
test: Fix clist_mempool_test.go
dudong2 Dec 12, 2024
ceeb20f
fix: Add handler for BeginRecheckTx/EndRecheckTx
dudong2 Dec 12, 2024
881fe44
test: Fix TestExtendVoteCalledWhenEnabled
dudong2 Dec 12, 2024
5f91d8e
test: Fix state_test.go
dudong2 Dec 12, 2024
8bcf311
test: Fix TestCallbackInvokedWhenSetEarly
dudong2 Dec 12, 2024
66aff5d
chore: Fix lint
dudong2 Dec 12, 2024
4a8e351
chore: Fix lint
dudong2 Dec 12, 2024
a0c57bd
refactor: Use *sync.WaitGroup directly
dudong2 Jan 8, 2025
88b4ae2
chore: Use CheckTxSync from mempool.reactor.Receive()
dudong2 Jan 8, 2025
f0ffeb7
chore: Remove original rechecktx logics already commented out
dudong2 Jan 8, 2025
b2b8195
chore: Sync GetGlobalCallback call with other client types
dudong2 Jan 8, 2025
502d49d
refactor: Sync with original cometbft about callback
dudong2 Jan 8, 2025
4df9287
fix: Use own mutex for ResponseCallback
dudong2 Jan 8, 2025
a977306
chore: Comment Begin/EndRecheckTx lock
dudong2 Jan 9, 2025
544741a
chore: Revert ostracon#226
dudong2 Jan 10, 2025
e77a690
test: Un-comment socket_client_test.gochore: Sync some codes with ori…
dudong2 Jan 10, 2025
721e49d
chore: Sync some codes with cometbft original
dudong2 Jan 10, 2025
05fd5f9
refactor: Sync with original cometbft
dudong2 Jan 11, 2025
7d9de48
test: Fix kvstore test
dudong2 Jan 11, 2025
c0b953d
chore: Remove useless codes
dudong2 Jan 11, 2025
b23244d
refactor: Use ResponseCheckTx pointer directly
dudong2 Jan 12, 2025
529386f
refactor: Use original cometbft codes for rpc/BroadcastTxSync
dudong2 Jan 12, 2025
a506c24
refactor: Use original cometbft codes for rpc/BroadcastTxCommit
dudong2 Jan 12, 2025
0efabd4
refactor: Sync with original cometbft for mempool/CheckTxSync
dudong2 Jan 12, 2025
881d172
test: Fix test
dudong2 Jan 12, 2025
3c7ad6e
refactor: Sync with original cometbft
dudong2 Jan 12, 2025
224bc7c
refactor: Simplify CheckTxAsync
dudong2 Jan 12, 2025
402df98
refactor: Sync with original cometbft for reqResCb
dudong2 Jan 12, 2025
a1b84ce
chore: Fix comments
dudong2 Jan 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 41 additions & 43 deletions abci/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,16 @@ type Client interface {
// with the exception of `CheckTxAsync` which we maintain
// for the v0 mempool. We should explore refactoring the
// mempool to remove this vestige behavior.
SetResponseCallback(Callback)
CheckTxAsync(context.Context, *types.RequestCheckTx) (*ReqRes, error)
SetGlobalCallback(GlobalCallback)
GetGlobalCallback() GlobalCallback
zsystm marked this conversation as resolved.
Show resolved Hide resolved

CheckTxSync(context.Context, *types.RequestCheckTx) (*types.ResponseCheckTx, error)
BeginRecheckTxSync(context.Context, *types.RequestBeginRecheckTx) (*types.ResponseBeginRecheckTx, error) // Signals the beginning of rechecking
EndRecheckTxSync(context.Context, *types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error) // Signals the end of rechecking

CheckTxAsync(context.Context, *types.RequestCheckTx, ResponseCallback) (*ReqRes, error)
BeginRecheckTxAsync(context.Context, *types.RequestBeginRecheckTx, ResponseCallback) (*ReqRes, error)
EndRecheckTxAsync(context.Context, *types.RequestEndRecheckTx, ResponseCallback) (*ReqRes, error)
}

//----------------------------------------
Expand All @@ -57,51 +65,31 @@ func NewClient(addr, transport string, mustConnect bool) (client Client, err err
return
}

type Callback func(*types.Request, *types.Response)
type GlobalCallback func(*types.Request, *types.Response)
zsystm marked this conversation as resolved.
Show resolved Hide resolved
type ResponseCallback func(*types.Response)

type ReqRes struct {
*types.Request
*sync.WaitGroup
*types.Response // Not set atomically, so be sure to use WaitGroup.

mtx cmtsync.Mutex

// callbackInvoked as a variable to track if the callback was already
// invoked during the regular execution of the request. This variable
// allows clients to set the callback simultaneously without potentially
// invoking the callback twice by accident, once when 'SetCallback' is
// called and once during the normal request.
callbackInvoked bool
cb func(*types.Response) // A single callback that may be set.
wg *sync.WaitGroup
zsystm marked this conversation as resolved.
Show resolved Hide resolved
done bool
cb func(*types.Response) // A single callback that may be set.
zsystm marked this conversation as resolved.
Show resolved Hide resolved
}

func NewReqRes(req *types.Request) *ReqRes {
func NewReqRes(req *types.Request, cb ResponseCallback) *ReqRes {
return &ReqRes{
Request: req,
WaitGroup: waitGroup1(),
Response: nil,
Request: req,
Response: nil,

callbackInvoked: false,
cb: nil,
wg: waitGroup1(),
done: false,
cb: cb,
}
}

// Sets sets the callback. If reqRes is already done, it will call the cb
// immediately. Note, reqRes.cb should not change if reqRes.done and only one
// callback is supported.
func (r *ReqRes) SetCallback(cb func(res *types.Response)) {
r.mtx.Lock()

if r.callbackInvoked {
r.mtx.Unlock()
cb(r.Response)
return
}

r.cb = cb
r.mtx.Unlock()
}

zsystm marked this conversation as resolved.
Show resolved Hide resolved
// InvokeCallback invokes a thread-safe execution of the configured callback
// if non-nil.
func (r *ReqRes) InvokeCallback() {
Expand All @@ -111,19 +99,29 @@ func (r *ReqRes) InvokeCallback() {
if r.cb != nil {
r.cb(r.Response)
}
r.callbackInvoked = true
}

// GetCallback returns the configured callback of the ReqRes object which may be
// nil. Note, it is not safe to concurrently call this in cases where it is
// marked done and SetCallback is called before calling GetCallback as that
// will invoke the callback twice and create a potential race condition.
//
// ref: https://github.com/tendermint/tendermint/issues/5439
func (r *ReqRes) GetCallback() func(*types.Response) {
func (r *ReqRes) SetDone(res *types.Response) (set bool) {
r.mtx.Lock()
defer r.mtx.Unlock()
return r.cb
// TODO should we panic if it's already done?
set = !r.done
if set {
r.Response = res
r.done = true
r.wg.Done()
}
r.mtx.Unlock()

// NOTE `r.cb` is immutable so we're safe to access it at here without `mtx`
if set && r.cb != nil {
r.cb(res)
}

return set
}

func (r *ReqRes) Wait() {
r.wg.Wait()
}

func waitGroup1() (wg *sync.WaitGroup) {
Expand Down
159 changes: 92 additions & 67 deletions abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,21 @@ type grpcClient struct {
service.BaseService
mustConnect bool

client types.ABCIClient
conn *grpc.ClientConn
chReqRes chan *ReqRes // dispatches "async" responses to callbacks *in order*, needed by mempool
client types.ABCIClient
conn *grpc.ClientConn

mtx sync.Mutex
addr string
err error
resCb func(*types.Request, *types.Response) // listens to all callbacks
mtx sync.Mutex
addr string
err error

globalCbMtx sync.Mutex
globalCb func(*types.Request, *types.Response) // listens to all callbacks
zsystm marked this conversation as resolved.
Show resolved Hide resolved
}

func NewGRPCClient(addr string, mustConnect bool) Client {
cli := &grpcClient{
addr: addr,
mustConnect: mustConnect,
// Buffering the channel is needed to make calls appear asynchronous,
// which is required when the caller makes multiple async calls before
// processing callbacks (e.g. due to holding locks). 64 means that a
// caller can make up to 64 async calls before a callback must be
// processed (otherwise it deadlocks). It also means that we can make 64
// gRPC calls while processing a slow callback at the channel head.
chReqRes: make(chan *ReqRes, 64),
}
cli.BaseService = *service.NewBaseService(nil, "grpcClient", cli)
return cli
Expand All @@ -58,33 +52,6 @@ func (cli *grpcClient) OnStart() error {
return err
}

// This processes asynchronous request/response messages and dispatches
// them to callbacks.
go func() {
// Use a separate function to use defer for mutex unlocks (this handles panics)
callCb := func(reqres *ReqRes) {
cli.mtx.Lock()
defer cli.mtx.Unlock()

reqres.Done()

// Notify client listener if set
if cli.resCb != nil {
cli.resCb(reqres.Request, reqres.Response)
}

// Notify reqRes listener if set
reqres.InvokeCallback()
}
for reqres := range cli.chReqRes {
if reqres != nil {
callCb(reqres)
} else {
cli.Logger.Error("Received nil reqres")
}
}
}()

zsystm marked this conversation as resolved.
Show resolved Hide resolved
RETRY_LOOP:
for {
conn, err := grpc.NewClient(cli.addr,
Expand Down Expand Up @@ -125,7 +92,6 @@ func (cli *grpcClient) OnStop() {
if cli.conn != nil {
cli.conn.Close()
}
close(cli.chReqRes)
}

func (cli *grpcClient) StopForError(err error) {
Expand All @@ -151,32 +117,36 @@ func (cli *grpcClient) Error() error {
return cli.err
}

// Set listener for all responses
// NOTE: callback may get internally generated flush responses.
func (cli *grpcClient) SetResponseCallback(resCb Callback) {
cli.mtx.Lock()
cli.resCb = resCb
cli.mtx.Unlock()
func (cli *grpcClient) SetGlobalCallback(globalCb GlobalCallback) {
cli.globalCbMtx.Lock()
cli.globalCb = globalCb
cli.globalCbMtx.Unlock()
}

func (cli *grpcClient) GetGlobalCallback() (cb GlobalCallback) {
cli.globalCbMtx.Lock()
cb = cli.globalCb
cli.globalCbMtx.Unlock()
return cb
}

//----------------------------------------

func (cli *grpcClient) CheckTxAsync(ctx context.Context, req *types.RequestCheckTx) (*ReqRes, error) {
res, err := cli.client.CheckTx(ctx, req, grpc.WaitForReady(true))
if err != nil {
cli.StopForError(err)
return nil, err
}
return cli.finishAsyncCall(types.ToRequestCheckTx(req), &types.Response{Value: &types.Response_CheckTx{CheckTx: res}}), nil
}
func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response, cb ResponseCallback) *ReqRes {
reqRes := NewReqRes(req, cb)

// goroutine for callbacks
go func() {
set := reqRes.SetDone(res)
if set {
// Notify client listener if set
if globalCb := cli.GetGlobalCallback(); globalCb != nil {
globalCb(req, res)
}
}
}()

// finishAsyncCall creates a ReqRes for an async call, and immediately populates it
// with the response. We don't complete it until it's been ordered via the channel.
func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes {
reqres := NewReqRes(req)
reqres.Response = res
cli.chReqRes <- reqres // use channel for async responses, since they must be ordered
return reqres
return reqRes
}

//----------------------------------------
Expand All @@ -194,10 +164,6 @@ func (cli *grpcClient) Info(ctx context.Context, req *types.RequestInfo) (*types
return cli.client.Info(ctx, req, grpc.WaitForReady(true))
}

func (cli *grpcClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
return cli.client.CheckTx(ctx, req, grpc.WaitForReady(true))
}

func (cli *grpcClient) Query(ctx context.Context, req *types.RequestQuery) (*types.ResponseQuery, error) {
return cli.client.Query(ctx, types.ToRequestQuery(req).GetQuery(), grpc.WaitForReady(true))
}
Expand Down Expand Up @@ -245,3 +211,62 @@ func (cli *grpcClient) VerifyVoteExtension(ctx context.Context, req *types.Reque
func (cli *grpcClient) FinalizeBlock(ctx context.Context, req *types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) {
return cli.client.FinalizeBlock(ctx, types.ToRequestFinalizeBlock(req).GetFinalizeBlock(), grpc.WaitForReady(true))
}

func (cli *grpcClient) CheckTxSync(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
return cli.client.CheckTx(ctx, req, grpc.WaitForReady(true))
}

func (cli *grpcClient) BeginRecheckTxSync(ctx context.Context, params *types.RequestBeginRecheckTx) (*types.ResponseBeginRecheckTx, error) {
reqres, _ := cli.BeginRecheckTxAsync(ctx, params, nil)
reqres.Wait()
return reqres.Response.GetBeginRecheckTx(), cli.Error()
}

func (cli *grpcClient) EndRecheckTxSync(ctx context.Context, params *types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error) {
reqres, _ := cli.EndRecheckTxAsync(ctx, params, nil)
reqres.Wait()
return reqres.Response.GetEndRecheckTx(), cli.Error()
}

func (cli *grpcClient) CheckTxAsync(ctx context.Context, req *types.RequestCheckTx, cb ResponseCallback) (*ReqRes, error) {
res, err := cli.client.CheckTx(ctx, req, grpc.WaitForReady(true))
if err != nil {
cli.StopForError(err)
return nil, err
}
return cli.finishAsyncCall(types.ToRequestCheckTx(req), &types.Response{Value: &types.Response_CheckTx{CheckTx: res}}, cb), nil
}

func (cli *grpcClient) BeginRecheckTxAsync(ctx context.Context, params *types.RequestBeginRecheckTx, cb ResponseCallback) (*ReqRes, error) {
req := types.ToRequestBeginRecheckTx(params)
res, err := cli.client.BeginRecheckTx(ctx, req.GetBeginRecheckTx(), grpc.WaitForReady(true))
if err != nil {
cli.StopForError(err)
}
return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_BeginRecheckTx{BeginRecheckTx: res}}, cb), nil
}

func (cli *grpcClient) EndRecheckTxAsync(ctx context.Context, params *types.RequestEndRecheckTx, cb ResponseCallback) (*ReqRes, error) {
req := types.ToRequestEndRecheckTx(params)
res, err := cli.client.EndRecheckTx(ctx, req.GetEndRecheckTx(), grpc.WaitForReady(true))
if err != nil {
cli.StopForError(err)
}
return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_EndRecheckTx{EndRecheckTx: res}}, cb), nil
}

func (cli *grpcClient) CheckTxSyncForApp(context.Context, *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
panic("not implemented")
}

func (cli *grpcClient) CheckTxAsyncForApp(context.Context, *types.RequestCheckTx, types.CheckTxCallback) {
panic("not implemented")
}

func (cli *grpcClient) BeginRecheckTx(ctx context.Context, params *types.RequestBeginRecheckTx) (*types.ResponseBeginRecheckTx, error) {
panic("not implemented")
}

func (cli *grpcClient) EndRecheckTx(ctx context.Context, params *types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error) {
panic("not implemented")
}
Loading
Loading