From ce29236971a0167f2ebe8c1a0e21a2851fea6710 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matija=20Martini=C4=87?= Date: Wed, 4 May 2022 22:02:55 +0200 Subject: [PATCH] Fix for the buggy broadcasting logic (#140) * Fix for the buggy broadcasting logic * Forgot errors * Add block timeout --- client/broadcast.go | 71 ++++++++++++++++--- client/broadcast_test.go | 146 +++++++++++++++++++++++++++++++++++++++ client/config.go | 6 ++ client/errors.go | 9 +++ 4 files changed, 223 insertions(+), 9 deletions(-) create mode 100644 client/broadcast_test.go create mode 100644 client/errors.go diff --git a/client/broadcast.go b/client/broadcast.go index 2e2f1503..5877c1bc 100644 --- a/client/broadcast.go +++ b/client/broadcast.go @@ -14,15 +14,65 @@ import ( tmtypes "github.com/tendermint/tendermint/types" ) -func (cc *ChainClient) BroadcastTx(ctx context.Context, tx []byte) (res *sdk.TxResponse, err error) { +const ( + defaultBroadcastWaitTimeout = 10 * time.Minute +) + +func (cc *ChainClient) BroadcastTx(ctx context.Context, tx []byte) (*sdk.TxResponse, error) { + var ( + blockTimeout time.Duration = defaultBroadcastWaitTimeout + err error + ) + + if cc.Config.BlockTimeout != "" { + blockTimeout, err = time.ParseDuration(cc.Config.BlockTimeout) + if err != nil { + // Did you call Validate() method on ChainClientConfig struct + // before coming here? + return nil, err + } + } + + return broadcastTx( + ctx, + cc.RPCClient, + cc.Codec.TxConfig.TxDecoder(), + tx, + blockTimeout, + ) +} + +type rpcTxBroadcaster interface { + Tx(ctx context.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) + BroadcastTxSync(context.Context, tmtypes.Tx) (*ctypes.ResultBroadcastTx, error) + + // TODO: implement commit and async as well + // BroadcastTxCommit(context.Context, tmtypes.Tx) (*ctypes.ResultBroadcastTxCommit, error) + // BroadcastTxAsync(context.Context, tmtypes.Tx) (*ctypes.ResultBroadcastTx, error) +} + +// broadcastTx broadcasts a TX and then waits for the TX to be included in the block. +// The waiting will either be canceled after the waitTimeout has run out or the context +// exited. +func broadcastTx( + ctx context.Context, + broadcaster rpcTxBroadcaster, + txDecoder sdk.TxDecoder, + tx []byte, + waitTimeout time.Duration, +) (*sdk.TxResponse, error) { // broadcast tx sync waits for check tx to pass // NOTE: this can return w/ a timeout // need to investigate if this will leave the tx // in the mempool or we can retry the broadcast at that // point - syncRes, err := cc.RPCClient.BroadcastTxSync(ctx, tx) - if errRes := CheckTendermintError(err, tx); errRes != nil || syncRes == nil { - return errRes, nil + syncRes, err := broadcaster.BroadcastTxSync(ctx, tx) + if err != nil { + errRes := CheckTendermintError(err, tx) + if errRes != nil { + return errRes, nil + } + return nil, err } if syncRes.Codespace == sdkerrors.RootCodespace && syncRes.Code == sdkerrors.ErrWrongSequence.ABCICode() { @@ -36,23 +86,26 @@ func (cc *ChainClient) BroadcastTx(ctx context.Context, tx []byte) (res *sdk.TxR // if not, we need to find a new way to block until inclusion in a block // wait for tx to be included in a block + exitAfter := time.After(waitTimeout) for { select { + case <-exitAfter: + return nil, fmt.Errorf("timed out after: %d; %w", waitTimeout, ErrTimeoutAfterWaitingForTxBroadcast) // TODO: this is potentially less than optimal and may // be better as something configurable case <-time.After(time.Millisecond * 100): - resTx, err := cc.RPCClient.Tx(ctx, syncRes.Hash, false) + resTx, err := broadcaster.Tx(ctx, syncRes.Hash, false) if err == nil { - return cc.mkTxResult(resTx) + return mkTxResult(txDecoder, resTx) } case <-ctx.Done(): - return + return nil, ctx.Err() } } } -func (cc *ChainClient) mkTxResult(resTx *ctypes.ResultTx) (*sdk.TxResponse, error) { - txb, err := cc.Codec.TxConfig.TxDecoder()(resTx.Tx) +func mkTxResult(txDecoder sdk.TxDecoder, resTx *ctypes.ResultTx) (*sdk.TxResponse, error) { + txb, err := txDecoder(resTx.Tx) if err != nil { return nil, err } diff --git a/client/broadcast_test.go b/client/broadcast_test.go new file mode 100644 index 00000000..1981d25f --- /dev/null +++ b/client/broadcast_test.go @@ -0,0 +1,146 @@ +package client + +import ( + "context" + "errors" + "testing" + "time" + + codectypes "github.com/cosmos/cosmos-sdk/codec/types" + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/stretchr/testify/assert" + ctypes "github.com/tendermint/tendermint/rpc/core/types" + tmtypes "github.com/tendermint/tendermint/types" +) + +var ( + errExpected = errors.New("expected unexpected error ;)") +) + +type myFakeMsg struct { + Value string +} + +func (m myFakeMsg) Reset() {} +func (m myFakeMsg) ProtoMessage() {} +func (m myFakeMsg) String() string { return "doesn't matter" } +func (m myFakeMsg) ValidateBasic() error { return nil } +func (m myFakeMsg) GetSigners() []sdk.AccAddress { return []sdk.AccAddress{sdk.AccAddress(`hello`)} } + +type myFakeTx struct { + msgs []myFakeMsg +} + +func (m myFakeTx) GetMsgs() (msgs []sdk.Msg) { + for _, msg := range m.msgs { + msgs = append(msgs, msg) + } + return +} +func (m myFakeTx) ValidateBasic() error { return nil } +func (m myFakeTx) AsAny() *codectypes.Any { return &codectypes.Any{} } + +type fakeBroadcaster struct { + tx func(context.Context, []byte, bool) (*ctypes.ResultTx, error) + broadcastSync func(context.Context, tmtypes.Tx) (*ctypes.ResultBroadcastTx, error) +} + +func (f fakeBroadcaster) Tx(ctx context.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) { + if f.tx == nil { + return nil, nil + } + return f.tx(ctx, hash, prove) +} + +func (f fakeBroadcaster) BroadcastTxSync(ctx context.Context, tx tmtypes.Tx) (*ctypes.ResultBroadcastTx, error) { + if f.broadcastSync == nil { + return nil, nil + } + return f.broadcastSync(ctx, tx) +} + +func TestBroadcast(t *testing.T) { + ctx := context.Background() + + for _, tt := range []struct { + name string + broadcaster fakeBroadcaster + txDecoder sdk.TxDecoder + txBytes []byte + waitTimeout time.Duration + expectedRes *sdk.TxResponse + expectedErr error + }{ + { + name: "simple success returns result", + broadcaster: fakeBroadcaster{ + broadcastSync: func(_ context.Context, _ tmtypes.Tx) (*ctypes.ResultBroadcastTx, error) { + return &ctypes.ResultBroadcastTx{ + Code: 1, + Hash: []byte(`123bob`), + }, nil + }, + tx: func(_ context.Context, hash []byte, _ bool) (*ctypes.ResultTx, error) { + assert.Equal(t, []byte(`123bob`), hash) + return &ctypes.ResultTx{}, nil + }, + }, + txDecoder: func(txBytes []byte) (sdk.Tx, error) { + return myFakeTx{ + []myFakeMsg{{"hello"}}, + }, nil + }, + expectedRes: &sdk.TxResponse{ + Tx: &codectypes.Any{}, + }, + }, + { + name: "success but timed out while waiting for tx", + waitTimeout: time.Microsecond, + broadcaster: fakeBroadcaster{ + broadcastSync: func(_ context.Context, _ tmtypes.Tx) (*ctypes.ResultBroadcastTx, error) { + return &ctypes.ResultBroadcastTx{ + Code: 1, + Hash: []byte(`123bob`), + }, nil + }, + tx: func(_ context.Context, hash []byte, _ bool) (*ctypes.ResultTx, error) { + <-time.After(time.Second) + // return doesn't matter because it will timeout before return will make sense + return nil, nil + }, + }, + expectedErr: ErrTimeoutAfterWaitingForTxBroadcast, + }, + { + name: "broadcasting returns an error", + broadcaster: fakeBroadcaster{ + broadcastSync: func(_ context.Context, _ tmtypes.Tx) (*ctypes.ResultBroadcastTx, error) { + return nil, errExpected + }, + }, + expectedErr: errExpected, + }, + } { + t.Run(tt.name, func(t *testing.T) { + duration := 1 * time.Second + if tt.waitTimeout > 0 { + duration = tt.waitTimeout + } + gotRes, gotErr := broadcastTx( + ctx, + tt.broadcaster, + tt.txDecoder, + tt.txBytes, + duration, + ) + if gotRes != nil { + // Ignoring timestamp for tests + gotRes.Timestamp = "" + } + assert.Equal(t, tt.expectedRes, gotRes) + assert.ErrorIs(t, gotErr, tt.expectedErr) + }) + } + +} diff --git a/client/config.go b/client/config.go index 1dc2c467..126dc728 100644 --- a/client/config.go +++ b/client/config.go @@ -60,6 +60,7 @@ type ChainClientConfig struct { KeyDirectory string `json:"key-directory" yaml:"key-directory"` Debug bool `json:"debug" yaml:"debug"` Timeout string `json:"timeout" yaml:"timeout"` + BlockTimeout string `json:"block-timeout" yaml:"block-timeout"` OutputFormat string `json:"output-format" yaml:"output-format"` SignModeStr string `json:"sign-mode" yaml:"sign-mode"` Modules []module.AppModuleBasic `json:"-" yaml:"-"` @@ -69,6 +70,11 @@ func (ccc *ChainClientConfig) Validate() error { if _, err := time.ParseDuration(ccc.Timeout); err != nil { return err } + if ccc.BlockTimeout != "" { + if _, err := time.ParseDuration(ccc.BlockTimeout); err != nil { + return err + } + } return nil } diff --git a/client/errors.go b/client/errors.go new file mode 100644 index 00000000..fd050cab --- /dev/null +++ b/client/errors.go @@ -0,0 +1,9 @@ +package client + +type _err string + +func (e _err) Error() string { return string(e) } + +const ( + ErrTimeoutAfterWaitingForTxBroadcast _err = "timed out after waiting for tx to get included in the block" +)