Skip to content

Commit

Permalink
chore: Revert ostracon#226
Browse files Browse the repository at this point in the history
  • Loading branch information
dudong2 committed Jan 9, 2025
1 parent a977306 commit c60360d
Show file tree
Hide file tree
Showing 10 changed files with 1,184 additions and 1,074 deletions.
58 changes: 36 additions & 22 deletions abci/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,14 @@ type Client interface {
// for the v0 mempool. We should explore refactoring the
// mempool to remove this vestige behavior.
SetResponseCallback(Callback)
GetResponseCallback() Callback

CheckTxSync(context.Context, *types.RequestCheckTx) (*types.ResponseCheckTx, error)
BeginRecheckTxSync(context.Context, *types.RequestBeginRecheckTx) (*types.ResponseBeginRecheckTx, error) // Signals the beginning of rechecking
EndRecheckTxSync(context.Context, *types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error) // Signals the end of rechecking

CheckTxAsync(context.Context, *types.RequestCheckTx, CheckTxCallback) (*ReqRes, error)
BeginRecheckTxAsync(context.Context, *types.RequestBeginRecheckTx, CheckTxCallback) (*ReqRes, error)
EndRecheckTxAsync(context.Context, *types.RequestEndRecheckTx, CheckTxCallback) (*ReqRes, error)
CheckTxAsync(context.Context, *types.RequestCheckTx) (*ReqRes, error)
BeginRecheckTxAsync(context.Context, *types.RequestBeginRecheckTx) (*ReqRes, error)
EndRecheckTxAsync(context.Context, *types.RequestEndRecheckTx) (*ReqRes, error)
}

//----------------------------------------
Expand All @@ -66,7 +65,6 @@ func NewClient(addr, transport string, mustConnect bool) (client Client, err err
}

type Callback func(*types.Request, *types.Response)
type CheckTxCallback func(*types.Response)

type ReqRes struct {
*types.Request
Expand All @@ -82,17 +80,33 @@ type ReqRes struct {
cb func(*types.Response)
}

func NewReqRes(req *types.Request, cb CheckTxCallback) *ReqRes {
func NewReqRes(req *types.Request) *ReqRes {
return &ReqRes{
Request: req,
WaitGroup: waitGroup1(),
Response: nil,

done: false,
cb: cb,
cb: nil,
}
}

// Sets sets the callback. If reqRes is already done, it will call the cb
// immediately. Note, reqRes.cb should not change if reqRes.done and only one
// callback is supported.
func (r *ReqRes) SetCallback(cb func(res *types.Response)) {
r.mtx.Lock()

if r.done {
r.mtx.Unlock()
cb(r.Response)
return
}

r.cb = cb
r.mtx.Unlock()
}

// InvokeCallback invokes a thread-safe execution of the configured callback
// if non-nil.
func (r *ReqRes) InvokeCallback() {
Expand All @@ -104,23 +118,23 @@ func (r *ReqRes) InvokeCallback() {
}
}

func (r *ReqRes) SetDone(res *types.Response) (set bool) {
// GetCallback returns the configured callback of the ReqRes object which may be
// nil. Note, it is not safe to concurrently call this in cases where it is
// marked done and SetCallback is called before calling GetCallback as that
// will invoke the callback twice and create a potential race condition.
//
// ref: https://github.com/tendermint/tendermint/issues/5439
func (r *ReqRes) GetCallback() func(*types.Response) {
r.mtx.Lock()
defer r.mtx.Unlock()
return r.cb
}

// SetDone marks the ReqRes object as done.
func (r *ReqRes) SetDone() {
r.mtx.Lock()
// TODO should we panic if it's already done?
set = !r.done
if set {
r.Response = res
r.done = true
r.Done()
}
r.done = true
r.mtx.Unlock()

// NOTE `r.cb` is immutable so we're safe to access it at here without `mtx`
if set && r.cb != nil {
r.cb(res)
}

return set
}

func waitGroup1() (wg *sync.WaitGroup) {
Expand Down
103 changes: 64 additions & 39 deletions abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,27 @@ type grpcClient struct {
service.BaseService
mustConnect bool

client types.ABCIClient
conn *grpc.ClientConn
client types.ABCIClient
conn *grpc.ClientConn
chReqRes chan *ReqRes // dispatches "async" responses to callbacks *in order*, needed by mempool

mtx sync.Mutex
addr string
err error
resCb func(*types.Request, *types.Response) // listens to all callbacks
resMtx sync.Mutex
mtx sync.Mutex
addr string
err error
resCb func(*types.Request, *types.Response) // listens to all callbacks
}

func NewGRPCClient(addr string, mustConnect bool) Client {
cli := &grpcClient{
addr: addr,
mustConnect: mustConnect,
// Buffering the channel is needed to make calls appear asynchronous,
// which is required when the caller makes multiple async calls before
// processing callbacks (e.g. due to holding locks). 64 means that a
// caller can make up to 64 async calls before a callback must be
// processed (otherwise it deadlocks). It also means that we can make 64
// gRPC calls while processing a slow callback at the channel head.
chReqRes: make(chan *ReqRes, 64),
}
cli.BaseService = *service.NewBaseService(nil, "grpcClient", cli)
return cli
Expand All @@ -51,6 +58,36 @@ func (cli *grpcClient) OnStart() error {
return err
}

// This processes asynchronous request/response messages and dispatches
// them to callbacks.
go func() {
// Use a separate function to use defer for mutex unlocks (this handles panics)
callCb := func(reqres *ReqRes) {
cli.mtx.Lock()
defer cli.mtx.Unlock()

reqres.SetDone()
reqres.Done()

// Notify client listener if set
if cli.resCb != nil {
cli.resCb(reqres.Request, reqres.Response)
}

// Notify reqRes listener if set
if cb := reqres.GetCallback(); cb != nil {
cb(reqres.Response)
}
}
for reqres := range cli.chReqRes {
if reqres != nil {
callCb(reqres)
} else {
cli.Logger.Error("Received nil reqres")
}
}
}()

RETRY_LOOP:
for {
conn, err := grpc.NewClient(cli.addr,
Expand Down Expand Up @@ -91,6 +128,7 @@ func (cli *grpcClient) OnStop() {
if cli.conn != nil {
cli.conn.Close()
}
close(cli.chReqRes)
}

func (cli *grpcClient) StopForError(err error) {
Expand All @@ -116,36 +154,23 @@ func (cli *grpcClient) Error() error {
return cli.err
}

// Set listener for all responses
// NOTE: callback may get internally generated flush responses.
func (cli *grpcClient) SetResponseCallback(resCb Callback) {
cli.resMtx.Lock()
cli.mtx.Lock()
cli.resCb = resCb
cli.resMtx.Unlock()
}

func (cli *grpcClient) GetResponseCallback() Callback {
cli.resMtx.Lock()
cb := cli.resCb
cli.resMtx.Unlock()
return cb
cli.mtx.Unlock()
}

//----------------------------------------

func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response, cb CheckTxCallback) *ReqRes {
reqRes := NewReqRes(req, cb)

// goroutine for callbacks
go func() {
set := reqRes.SetDone(res)
if set {
// Notify client listener if set
if cb := cli.GetResponseCallback(); cb != nil {
cb(req, res)
}
}
}()

return reqRes
// finishAsyncCall creates a ReqRes for an async call, and immediately populates it
// with the response. We don't complete it until it's been ordered via the channel.
func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes {
reqres := NewReqRes(req)
reqres.Response = res
cli.chReqRes <- reqres // use channel for async responses, since they must be ordered
return reqres
}

//----------------------------------------
Expand Down Expand Up @@ -216,42 +241,42 @@ func (cli *grpcClient) CheckTxSync(ctx context.Context, req *types.RequestCheckT
}

func (cli *grpcClient) BeginRecheckTxSync(ctx context.Context, params *types.RequestBeginRecheckTx) (*types.ResponseBeginRecheckTx, error) {
reqres, _ := cli.BeginRecheckTxAsync(ctx, params, nil)
reqres, _ := cli.BeginRecheckTxAsync(ctx, params)
reqres.Wait()
return reqres.Response.GetBeginRecheckTx(), cli.Error()
}

func (cli *grpcClient) EndRecheckTxSync(ctx context.Context, params *types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error) {
reqres, _ := cli.EndRecheckTxAsync(ctx, params, nil)
reqres, _ := cli.EndRecheckTxAsync(ctx, params)
reqres.Wait()
return reqres.Response.GetEndRecheckTx(), cli.Error()
}

func (cli *grpcClient) CheckTxAsync(ctx context.Context, req *types.RequestCheckTx, cb CheckTxCallback) (*ReqRes, error) {
func (cli *grpcClient) CheckTxAsync(ctx context.Context, req *types.RequestCheckTx) (*ReqRes, error) {
res, err := cli.client.CheckTx(ctx, req, grpc.WaitForReady(true))
if err != nil {
cli.StopForError(err)
return nil, err
}
return cli.finishAsyncCall(types.ToRequestCheckTx(req), &types.Response{Value: &types.Response_CheckTx{CheckTx: res}}, cb), nil
return cli.finishAsyncCall(types.ToRequestCheckTx(req), &types.Response{Value: &types.Response_CheckTx{CheckTx: res}}), nil
}

func (cli *grpcClient) BeginRecheckTxAsync(ctx context.Context, params *types.RequestBeginRecheckTx, cb CheckTxCallback) (*ReqRes, error) {
func (cli *grpcClient) BeginRecheckTxAsync(ctx context.Context, params *types.RequestBeginRecheckTx) (*ReqRes, error) {
req := types.ToRequestBeginRecheckTx(params)
res, err := cli.client.BeginRecheckTx(ctx, req.GetBeginRecheckTx(), grpc.WaitForReady(true))
if err != nil {
cli.StopForError(err)
}
return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_BeginRecheckTx{BeginRecheckTx: res}}, cb), nil
return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_BeginRecheckTx{BeginRecheckTx: res}}), nil
}

func (cli *grpcClient) EndRecheckTxAsync(ctx context.Context, params *types.RequestEndRecheckTx, cb CheckTxCallback) (*ReqRes, error) {
func (cli *grpcClient) EndRecheckTxAsync(ctx context.Context, params *types.RequestEndRecheckTx) (*ReqRes, error) {
req := types.ToRequestEndRecheckTx(params)
res, err := cli.client.EndRecheckTx(ctx, req.GetEndRecheckTx(), grpc.WaitForReady(true))
if err != nil {
cli.StopForError(err)
}
return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_EndRecheckTx{EndRecheckTx: res}}, cb), nil
return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_EndRecheckTx{EndRecheckTx: res}}), nil
}

func (cli *grpcClient) CheckTxSyncForApp(context.Context, *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
Expand Down
84 changes: 49 additions & 35 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ type localClient struct {
mtx *cmtsync.Mutex
types.Application
Callback
resMtx cmtsync.Mutex
}

var _ Client = (*localClient)(nil)
Expand All @@ -40,26 +39,9 @@ func NewLocalClient(mtx *cmtsync.Mutex, app types.Application) Client {
}

func (app *localClient) SetResponseCallback(cb Callback) {
app.resMtx.Lock()
app.mtx.Lock()
app.Callback = cb
app.resMtx.Unlock()
}

func (app *localClient) GetResponseCallback() (cb Callback) {
app.resMtx.Lock()
cb = app.Callback
app.resMtx.Unlock()
return cb
}

func (app *localClient) done(reqRes *ReqRes, res *types.Response) *ReqRes {
set := reqRes.SetDone(res)
if set {
if cb := app.GetResponseCallback(); cb != nil {
cb(reqRes.Request, res)
}
}
return reqRes
app.mtx.Unlock()
}

//-------------------------------------------------------
Expand Down Expand Up @@ -193,30 +175,47 @@ func (app *localClient) EndRecheckTxSync(ctx context.Context, req *types.Request
return app.Application.EndRecheckTx(ctx, req)
}

func (app *localClient) CheckTxAsync(ctx context.Context, params *types.RequestCheckTx, cb CheckTxCallback) (*ReqRes, error) {
// TODO(dudong2): params -> req
func (app *localClient) CheckTxAsync(ctx context.Context, params *types.RequestCheckTx) (*ReqRes, error) {
req := types.ToRequestCheckTx(params)
reqRes := NewReqRes(req, cb)
reqRes := NewReqRes(req)

app.Application.CheckTxAsyncForApp(ctx, params, func(r *types.ResponseCheckTx) {
res := types.ToResponseCheckTx(r)
app.done(reqRes, res)
app.Callback(req, res)
reqRes.Response = res
reqRes.Done()
reqRes.SetDone()

// Notify reqRes listener if set
if cb := reqRes.GetCallback(); cb != nil {
cb(res)
}
})

return reqRes, nil
}

func (app *localClient) BeginRecheckTxAsync(ctx context.Context, req *types.RequestBeginRecheckTx, cb CheckTxCallback) (*ReqRes, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
reqRes := NewReqRes(types.ToRequestBeginRecheckTx(req), cb)
res, _ := app.Application.BeginRecheckTx(ctx, req)
return app.done(reqRes, types.ToResponseBeginRecheckTx(res)), nil
func (app *localClient) BeginRecheckTxAsync(ctx context.Context, req *types.RequestBeginRecheckTx) (*ReqRes, error) {
res, err := app.Application.BeginRecheckTx(ctx, req)
if err != nil {
return nil, err
}
return app.callback(
types.ToRequestBeginRecheckTx(req),
types.ToResponseBeginRecheckTx(res),
), nil
}

func (app *localClient) EndRecheckTxAsync(ctx context.Context, req *types.RequestEndRecheckTx, cb CheckTxCallback) (*ReqRes, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
reqRes := NewReqRes(types.ToRequestEndRecheckTx(req), cb)
res, _ := app.Application.EndRecheckTx(ctx, req)
return app.done(reqRes, types.ToResponseEndRecheckTx(res)), nil
func (app *localClient) EndRecheckTxAsync(ctx context.Context, req *types.RequestEndRecheckTx) (*ReqRes, error) {
res, err := app.Application.EndRecheckTx(ctx, req)
if err != nil {
return nil, err
}
return app.callback(
types.ToRequestEndRecheckTx(req),
types.ToResponseEndRecheckTx(res),
), nil
}

func (app *localClient) CheckTxSyncForApp(context.Context, *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
Expand All @@ -234,3 +233,18 @@ func (app *localClient) BeginRecheckTx(ctx context.Context, params *types.Reques
func (app *localClient) EndRecheckTx(ctx context.Context, params *types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error) {
panic("not implemented")
}

//-------------------------------------------------------

func (app *localClient) callback(req *types.Request, res *types.Response) *ReqRes {
app.Callback(req, res)
return newLocalReqRes(req, res)
}

func newLocalReqRes(req *types.Request, res *types.Response) *ReqRes {
reqRes := NewReqRes(req)
reqRes.Response = res
reqRes.Done()
reqRes.SetDone()
return reqRes
}
Loading

0 comments on commit c60360d

Please sign in to comment.