Skip to content
This repository has been archived by the owner on Feb 23, 2024. It is now read-only.

Commit

Permalink
Fix for the buggy broadcasting logic (#140)
Browse files Browse the repository at this point in the history
* Fix for the buggy broadcasting logic

* Forgot errors

* Add block timeout
  • Loading branch information
Vizualni authored May 4, 2022
1 parent ba9bb20 commit ce29236
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 9 deletions.
71 changes: 62 additions & 9 deletions client/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,65 @@ import (
tmtypes "github.com/tendermint/tendermint/types"
)

func (cc *ChainClient) BroadcastTx(ctx context.Context, tx []byte) (res *sdk.TxResponse, err error) {
const (
defaultBroadcastWaitTimeout = 10 * time.Minute
)

func (cc *ChainClient) BroadcastTx(ctx context.Context, tx []byte) (*sdk.TxResponse, error) {
var (
blockTimeout time.Duration = defaultBroadcastWaitTimeout
err error
)

if cc.Config.BlockTimeout != "" {
blockTimeout, err = time.ParseDuration(cc.Config.BlockTimeout)
if err != nil {
// Did you call Validate() method on ChainClientConfig struct
// before coming here?
return nil, err
}
}

return broadcastTx(
ctx,
cc.RPCClient,
cc.Codec.TxConfig.TxDecoder(),
tx,
blockTimeout,
)
}

type rpcTxBroadcaster interface {
Tx(ctx context.Context, hash []byte, prove bool) (*ctypes.ResultTx, error)
BroadcastTxSync(context.Context, tmtypes.Tx) (*ctypes.ResultBroadcastTx, error)

// TODO: implement commit and async as well
// BroadcastTxCommit(context.Context, tmtypes.Tx) (*ctypes.ResultBroadcastTxCommit, error)
// BroadcastTxAsync(context.Context, tmtypes.Tx) (*ctypes.ResultBroadcastTx, error)
}

// broadcastTx broadcasts a TX and then waits for the TX to be included in the block.
// The waiting will either be canceled after the waitTimeout has run out or the context
// exited.
func broadcastTx(
ctx context.Context,
broadcaster rpcTxBroadcaster,
txDecoder sdk.TxDecoder,
tx []byte,
waitTimeout time.Duration,
) (*sdk.TxResponse, error) {
// broadcast tx sync waits for check tx to pass
// NOTE: this can return w/ a timeout
// need to investigate if this will leave the tx
// in the mempool or we can retry the broadcast at that
// point
syncRes, err := cc.RPCClient.BroadcastTxSync(ctx, tx)
if errRes := CheckTendermintError(err, tx); errRes != nil || syncRes == nil {
return errRes, nil
syncRes, err := broadcaster.BroadcastTxSync(ctx, tx)
if err != nil {
errRes := CheckTendermintError(err, tx)
if errRes != nil {
return errRes, nil
}
return nil, err
}

if syncRes.Codespace == sdkerrors.RootCodespace && syncRes.Code == sdkerrors.ErrWrongSequence.ABCICode() {
Expand All @@ -36,23 +86,26 @@ func (cc *ChainClient) BroadcastTx(ctx context.Context, tx []byte) (res *sdk.TxR
// if not, we need to find a new way to block until inclusion in a block

// wait for tx to be included in a block
exitAfter := time.After(waitTimeout)
for {
select {
case <-exitAfter:
return nil, fmt.Errorf("timed out after: %d; %w", waitTimeout, ErrTimeoutAfterWaitingForTxBroadcast)
// TODO: this is potentially less than optimal and may
// be better as something configurable
case <-time.After(time.Millisecond * 100):
resTx, err := cc.RPCClient.Tx(ctx, syncRes.Hash, false)
resTx, err := broadcaster.Tx(ctx, syncRes.Hash, false)
if err == nil {
return cc.mkTxResult(resTx)
return mkTxResult(txDecoder, resTx)
}
case <-ctx.Done():
return
return nil, ctx.Err()
}
}
}

func (cc *ChainClient) mkTxResult(resTx *ctypes.ResultTx) (*sdk.TxResponse, error) {
txb, err := cc.Codec.TxConfig.TxDecoder()(resTx.Tx)
func mkTxResult(txDecoder sdk.TxDecoder, resTx *ctypes.ResultTx) (*sdk.TxResponse, error) {
txb, err := txDecoder(resTx.Tx)
if err != nil {
return nil, err
}
Expand Down
146 changes: 146 additions & 0 deletions client/broadcast_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package client

import (
"context"
"errors"
"testing"
"time"

codectypes "github.com/cosmos/cosmos-sdk/codec/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/stretchr/testify/assert"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
tmtypes "github.com/tendermint/tendermint/types"
)

var (
errExpected = errors.New("expected unexpected error ;)")
)

type myFakeMsg struct {
Value string
}

func (m myFakeMsg) Reset() {}
func (m myFakeMsg) ProtoMessage() {}
func (m myFakeMsg) String() string { return "doesn't matter" }
func (m myFakeMsg) ValidateBasic() error { return nil }
func (m myFakeMsg) GetSigners() []sdk.AccAddress { return []sdk.AccAddress{sdk.AccAddress(`hello`)} }

type myFakeTx struct {
msgs []myFakeMsg
}

func (m myFakeTx) GetMsgs() (msgs []sdk.Msg) {
for _, msg := range m.msgs {
msgs = append(msgs, msg)
}
return
}
func (m myFakeTx) ValidateBasic() error { return nil }
func (m myFakeTx) AsAny() *codectypes.Any { return &codectypes.Any{} }

type fakeBroadcaster struct {
tx func(context.Context, []byte, bool) (*ctypes.ResultTx, error)
broadcastSync func(context.Context, tmtypes.Tx) (*ctypes.ResultBroadcastTx, error)
}

func (f fakeBroadcaster) Tx(ctx context.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) {
if f.tx == nil {
return nil, nil
}
return f.tx(ctx, hash, prove)
}

func (f fakeBroadcaster) BroadcastTxSync(ctx context.Context, tx tmtypes.Tx) (*ctypes.ResultBroadcastTx, error) {
if f.broadcastSync == nil {
return nil, nil
}
return f.broadcastSync(ctx, tx)
}

func TestBroadcast(t *testing.T) {
ctx := context.Background()

for _, tt := range []struct {
name string
broadcaster fakeBroadcaster
txDecoder sdk.TxDecoder
txBytes []byte
waitTimeout time.Duration
expectedRes *sdk.TxResponse
expectedErr error
}{
{
name: "simple success returns result",
broadcaster: fakeBroadcaster{
broadcastSync: func(_ context.Context, _ tmtypes.Tx) (*ctypes.ResultBroadcastTx, error) {
return &ctypes.ResultBroadcastTx{
Code: 1,
Hash: []byte(`123bob`),
}, nil
},
tx: func(_ context.Context, hash []byte, _ bool) (*ctypes.ResultTx, error) {
assert.Equal(t, []byte(`123bob`), hash)
return &ctypes.ResultTx{}, nil
},
},
txDecoder: func(txBytes []byte) (sdk.Tx, error) {
return myFakeTx{
[]myFakeMsg{{"hello"}},
}, nil
},
expectedRes: &sdk.TxResponse{
Tx: &codectypes.Any{},
},
},
{
name: "success but timed out while waiting for tx",
waitTimeout: time.Microsecond,
broadcaster: fakeBroadcaster{
broadcastSync: func(_ context.Context, _ tmtypes.Tx) (*ctypes.ResultBroadcastTx, error) {
return &ctypes.ResultBroadcastTx{
Code: 1,
Hash: []byte(`123bob`),
}, nil
},
tx: func(_ context.Context, hash []byte, _ bool) (*ctypes.ResultTx, error) {
<-time.After(time.Second)
// return doesn't matter because it will timeout before return will make sense
return nil, nil
},
},
expectedErr: ErrTimeoutAfterWaitingForTxBroadcast,
},
{
name: "broadcasting returns an error",
broadcaster: fakeBroadcaster{
broadcastSync: func(_ context.Context, _ tmtypes.Tx) (*ctypes.ResultBroadcastTx, error) {
return nil, errExpected
},
},
expectedErr: errExpected,
},
} {
t.Run(tt.name, func(t *testing.T) {
duration := 1 * time.Second
if tt.waitTimeout > 0 {
duration = tt.waitTimeout
}
gotRes, gotErr := broadcastTx(
ctx,
tt.broadcaster,
tt.txDecoder,
tt.txBytes,
duration,
)
if gotRes != nil {
// Ignoring timestamp for tests
gotRes.Timestamp = ""
}
assert.Equal(t, tt.expectedRes, gotRes)
assert.ErrorIs(t, gotErr, tt.expectedErr)
})
}

}
6 changes: 6 additions & 0 deletions client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type ChainClientConfig struct {
KeyDirectory string `json:"key-directory" yaml:"key-directory"`
Debug bool `json:"debug" yaml:"debug"`
Timeout string `json:"timeout" yaml:"timeout"`
BlockTimeout string `json:"block-timeout" yaml:"block-timeout"`
OutputFormat string `json:"output-format" yaml:"output-format"`
SignModeStr string `json:"sign-mode" yaml:"sign-mode"`
Modules []module.AppModuleBasic `json:"-" yaml:"-"`
Expand All @@ -69,6 +70,11 @@ func (ccc *ChainClientConfig) Validate() error {
if _, err := time.ParseDuration(ccc.Timeout); err != nil {
return err
}
if ccc.BlockTimeout != "" {
if _, err := time.ParseDuration(ccc.BlockTimeout); err != nil {
return err
}
}
return nil
}

Expand Down
9 changes: 9 additions & 0 deletions client/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package client

type _err string

func (e _err) Error() string { return string(e) }

const (
ErrTimeoutAfterWaitingForTxBroadcast _err = "timed out after waiting for tx to get included in the block"
)

0 comments on commit ce29236

Please sign in to comment.