Skip to content

Commit

Permalink
Feat: tx rate limit (#7)
Browse files Browse the repository at this point in the history
* feat: Rate limit

* chore: Change default valuetest: Fix tests

* chore: Change default value to -1

* test: Add mock func for ResetRateLimitCounter

* chore: Rate -> Limit
  • Loading branch information
dudong2 authored Jan 17, 2025
1 parent 2209f5c commit 782b0da
Show file tree
Hide file tree
Showing 14 changed files with 128 additions and 11 deletions.
1 change: 1 addition & 0 deletions blocksync/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func newReactor(
mock.Anything,
mock.Anything,
mock.Anything).Return(nil)
mp.On("ResetRateLimitCounter").Return()

// Make the Reactor itself.
// NOTE we have to create and commit the blocks first because
Expand Down
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,8 @@ type MempoolConfig struct {
// Maximum size of a single transaction
// NOTE: the max size of a tx transmitted over the network is {max_tx_bytes}.
MaxTxBytes int `mapstructure:"max_tx_bytes"`
// Maximum possible input txs during consensus (= 1 block)
RateLimit int32 `mapstructure:"rate_limit"`
// Maximum size of a batch of transactions to send to a peer
// Including space needed by encoding (one varint per transaction).
// XXX: Unused due to https://github.com/tendermint/tendermint/issues/5796
Expand Down Expand Up @@ -795,6 +797,7 @@ func DefaultMempoolConfig() *MempoolConfig {
// ABCI Recheck
Size: 5000,
MaxTxsBytes: 1024 * 1024 * 1024, // 1GB
RateLimit: -1,
CacheSize: 10000,
MaxTxBytes: 1024 * 1024, // 1MB
ExperimentalMaxGossipConnectionsToNonPersistentPeers: 0,
Expand Down Expand Up @@ -840,6 +843,9 @@ func (cfg *MempoolConfig) ValidateBasic() error {
if cfg.MaxTxBytes < 0 {
return errors.New("max_tx_bytes can't be negative")
}
if cfg.RateLimit < -1 {
return errors.New("rate_limit can't be lower than -1")
}
if cfg.ExperimentalMaxGossipConnectionsToPersistentPeers < 0 {
return errors.New("experimental_max_gossip_connections_to_persistent_peers can't be negative")
}
Expand Down
9 changes: 5 additions & 4 deletions consensus/replay_stubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ type emptyMempool struct{}

var _ mempl.Mempool = emptyMempool{}

func (emptyMempool) Lock() {}
func (emptyMempool) Unlock() {}
func (emptyMempool) Size() int { return 0 }
func (emptyMempool) SizeBytes() int64 { return 0 }
func (emptyMempool) Lock() {}
func (emptyMempool) Unlock() {}
func (emptyMempool) Size() int { return 0 }
func (emptyMempool) SizeBytes() int64 { return 0 }
func (emptyMempool) ResetRateLimitCounter() {}
func (emptyMempool) CheckTxSync(types.Tx, func(*abci.ResponseCheckTx), mempl.TxInfo) (*abci.ResponseCheckTx, error) {
return nil, nil
}
Expand Down
62 changes: 55 additions & 7 deletions mempool/clist_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type CListMempool struct {

logger log.Logger
metrics *Metrics

rateLimitCounter *RateLimitCounter
}

type requestCheckTxAsync struct {
Expand All @@ -79,13 +81,14 @@ func NewCListMempool(
options ...CListMempoolOption,
) *CListMempool {
mp := &CListMempool{
config: cfg,
proxyAppConn: proxyAppConn,
txs: clist.New(),
chReqCheckTx: make(chan *requestCheckTxAsync, cfg.Size),
recheck: newRecheck(),
logger: log.NewNopLogger(),
metrics: NopMetrics(),
config: cfg,
proxyAppConn: proxyAppConn,
txs: clist.New(),
chReqCheckTx: make(chan *requestCheckTxAsync, cfg.Size),
recheck: newRecheck(),
logger: log.NewNopLogger(),
metrics: NopMetrics(),
rateLimitCounter: NewRateLimitCounter(),
}
mp.height.Store(height)

Expand Down Expand Up @@ -201,6 +204,7 @@ func (mem *CListMempool) Flush() {
mem.cache.Reset()

mem.removeAllTxs()
mem.ResetRateLimitCounter()
}

// TxsFront returns the first transaction in the ordered list for peer
Expand Down Expand Up @@ -356,6 +360,7 @@ func (mem *CListMempool) globalCb(req *abci.Request, res *abci.Response) {

// update metrics
mem.metrics.Size.Set(float64(mem.Size()))
mem.metrics.RateLimitCounter.Set(float64(mem.rateLimitCounter.get()))

default:
// ignore other messages
Expand Down Expand Up @@ -387,6 +392,7 @@ func (mem *CListMempool) reqResCb(
// update metrics
mem.metrics.Size.Set(float64(mem.Size()))
mem.metrics.SizeBytes.Set(float64(mem.SizeBytes()))
mem.metrics.RateLimitCounter.Set(float64(mem.rateLimitCounter.get()))

// passed in by the caller of CheckTx, eg. the RPC
if externalCb != nil {
Expand Down Expand Up @@ -431,6 +437,15 @@ func (mem *CListMempool) isFull(txSize int) error {
}
}

if mem.config.RateLimit > -1 {
if mem.rateLimitCounter.get() >= mem.config.RateLimit {
return ErrMempoolRateLimitExceeded{
Limit: mem.config.RateLimit,
Count: mem.rateLimitCounter.get(),
}
}
}

if mem.recheck.consideredFull() {
return ErrRecheckFull
}
Expand Down Expand Up @@ -487,6 +502,7 @@ func (mem *CListMempool) resCbFirstTime(
}
memTx.addSender(txInfo.SenderID)
mem.addTx(memTx)
mem.rateLimitCounter.inc()
mem.logger.Debug(
"added good transaction",
"tx", types.Tx(tx).Hash(),
Expand Down Expand Up @@ -704,6 +720,7 @@ func (mem *CListMempool) Update(
// Update metrics
mem.metrics.Size.Set(float64(mem.Size()))
mem.metrics.SizeBytes.Set(float64(mem.SizeBytes()))
mem.metrics.RateLimitCounter.Set(float64(mem.rateLimitCounter.get()))

return nil
}
Expand Down Expand Up @@ -754,6 +771,11 @@ func (mem *CListMempool) recheckTxs() {
mem.logger.Debug("done rechecking txs", "height", mem.height.Load(), "num-txs", mem.Size())
}

func (mem *CListMempool) ResetRateLimitCounter() {
mem.logger.Debug("reset rate limit counter")
mem.rateLimitCounter.reset()
}

// The cursor and end pointers define a dynamic list of transactions that could be rechecked. The
// end pointer is fixed. When a recheck response for a transaction is received, cursor will point to
// the entry in the mempool corresponding to that transaction, thus narrowing the list. Transactions
Expand Down Expand Up @@ -865,3 +887,29 @@ func (rc *recheck) setRecheckFull() bool {
func (rc *recheck) consideredFull() bool {
return rc.recheckFull.Load()
}

// RateLimitCounter tracks the number of transactions added to the mempool within a block.
// It is used to limit the rate of incoming transactions and is reset to zero after each block commit.
// The counter is incremented when a transaction is added to the mempool and provides a mechanism
// to enforce transaction rate limits on a per-block basis.
type RateLimitCounter struct {
counter atomic.Int32
}

func NewRateLimitCounter() *RateLimitCounter {
return &RateLimitCounter{
counter: atomic.Int32{},
}
}

func (c *RateLimitCounter) inc() int32 {
return c.counter.Add(1)
}

func (c *RateLimitCounter) get() int32 {
return c.counter.Load()
}

func (c *RateLimitCounter) reset() {
c.counter.Store(0)
}
23 changes: 23 additions & 0 deletions mempool/clist_mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,29 @@ func TestMempool_KeepInvalidTxsInCache(t *testing.T) {
}
}

func TestRateLimit(t *testing.T) {
app := kvstore.NewInMemoryApplication()
cc := proxy.NewLocalClientCreator(app)
mp, cleanup := newMempoolWithApp(cc)
mp.config.RateLimit = 3000
defer cleanup()

txs := make(types.Txs, 3100)
txInfo := TxInfo{SenderID: UnknownPeerID}
var err error
for i := 0; i < txs.Len(); i++ {
txBytes := kvstore.NewRandomTx(20)
txs[i] = txBytes
if _, err = mp.CheckTxSync(txBytes, nil, txInfo); err != nil {
if IsPreCheckError(err) {
t.Fatalf("PreCheck failed: %v while checking #%d tx", err, i)
continue
}
}
}
require.ErrorIs(t, err, ErrMempoolRateLimitExceeded{Limit: 3000, Count: 3000})
}

func TestTxsAvailable(t *testing.T) {
app := kvstore.NewInMemoryApplication()
cc := proxy.NewLocalClientCreator(app)
Expand Down
9 changes: 9 additions & 0 deletions mempool/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ func (e ErrMempoolIsFull) Error() string {
)
}

type ErrMempoolRateLimitExceeded struct {
Limit int32
Count int32
}

func (e ErrMempoolRateLimitExceeded) Error() string {
return fmt.Sprintf("mempool rate limit exceeded: limit %d, count %d", e.Limit, e.Count)
}

// ErrPreCheck defines an error where a transaction fails a pre-check.
type ErrPreCheck struct {
Err error
Expand Down
3 changes: 3 additions & 0 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ type Mempool interface {

// SizeBytes returns the total size of all txs in the mempool.
SizeBytes() int64

// ResetRateLimitCounter resets the rate limit counter.
ResetRateLimitCounter()
}

// PreCheckFunc is an optional filter executed before CheckTx and rejects
Expand Down
7 changes: 7 additions & 0 deletions mempool/metrics.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions mempool/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,7 @@ type Metrics struct {
// Number of connections being actively used for gossiping transactions
// (experimental feature).
ActiveOutboundConnections metrics.Gauge

// Number of transactions in a current block
RateLimitCounter metrics.Gauge
}
5 changes: 5 additions & 0 deletions mempool/mocks/mempool.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions mempool/nop_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ func (*NopMempool) Size() int { return 0 }
// SizeBytes always returns 0.
func (*NopMempool) SizeBytes() int64 { return 0 }

// ResetRateLimitCounter does nothing.
func (*NopMempool) ResetRateLimitCounter() {}

// NopMempoolReactor is a mempool reactor that does nothing.
type NopMempoolReactor struct {
service.BaseService
Expand Down
1 change: 1 addition & 0 deletions state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ func (blockExec *BlockExecutor) Commit(
TxPreCheck(state),
TxPostCheck(state),
)
blockExec.mempool.ResetRateLimitCounter()

return res.RetainHeight, err
}
Expand Down
4 changes: 4 additions & 0 deletions state/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func TestApplyBlock(t *testing.T) {
mock.Anything,
mock.Anything,
mock.Anything).Return(nil)
mp.On("ResetRateLimitCounter").Return()
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(),
mp, sm.EmptyEvidencePool{}, blockStore)

Expand Down Expand Up @@ -126,6 +127,7 @@ func TestFinalizeBlockDecidedLastCommit(t *testing.T) {
mock.Anything,
mock.Anything,
mock.Anything).Return(nil)
mp.On("ResetRateLimitCounter").Return()

eventBus := types.NewEventBus()
require.NoError(t, eventBus.Start())
Expand Down Expand Up @@ -340,6 +342,7 @@ func TestFinalizeBlockMisbehavior(t *testing.T) {
mock.Anything,
mock.Anything,
mock.Anything).Return(nil)
mp.On("ResetRateLimitCounter").Return()

blockStore := store.NewBlockStore(dbm.NewMemDB())

Expand Down Expand Up @@ -599,6 +602,7 @@ func TestFinalizeBlockValidatorUpdates(t *testing.T) {
mock.Anything,
mock.Anything).Return(nil)
mp.On("ReapMaxBytesMaxGas", mock.Anything, mock.Anything).Return(types.Txs{})
mp.On("ResetRateLimitCounter").Return()

blockStore := store.NewBlockStore(dbm.NewMemDB())
blockExec := sm.NewBlockExecutor(
Expand Down
3 changes: 3 additions & 0 deletions state/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func TestValidateBlockHeader(t *testing.T) {
mock.Anything,
mock.Anything,
mock.Anything).Return(nil)
mp.On("ResetRateLimitCounter").Return()

blockStore := store.NewBlockStore(dbm.NewMemDB())

Expand Down Expand Up @@ -143,6 +144,7 @@ func TestValidateBlockCommit(t *testing.T) {
mock.Anything,
mock.Anything,
mock.Anything).Return(nil)
mp.On("ResetRateLimitCounter").Return()

blockStore := store.NewBlockStore(dbm.NewMemDB())

Expand Down Expand Up @@ -295,6 +297,7 @@ func TestValidateBlockEvidence(t *testing.T) {
mock.Anything,
mock.Anything,
mock.Anything).Return(nil)
mp.On("ResetRateLimitCounter").Return()
state.ConsensusParams.Evidence.MaxBytes = 1000
blockStore := store.NewBlockStore(dbm.NewMemDB())

Expand Down

0 comments on commit 782b0da

Please sign in to comment.