Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Parallel rechecktx #9

Merged
merged 2 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 14 additions & 152 deletions mempool/clist_mempool.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package mempool

import (
"bytes"
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

abci "github.com/cometbft/cometbft/abci/types"
"github.com/cometbft/cometbft/config"
Expand Down Expand Up @@ -45,9 +43,6 @@ type CListMempool struct {
txs *clist.CList // concurrent linked-list of good txs
proxyAppConn proxy.AppConnMempool

// Keeps track of the rechecking process.
recheck *recheck

// Map for quick access to txs to record sender in CheckTx.
// txsMap: txKey -> CElement
txsMap sync.Map
Expand Down Expand Up @@ -85,7 +80,6 @@ func NewCListMempool(
proxyAppConn: proxyAppConn,
txs: clist.New(),
chReqCheckTx: make(chan *requestCheckTxAsync, cfg.Size),
recheck: newRecheck(),
logger: log.NewNopLogger(),
metrics: NopMetrics(),
rateLimitCounter: NewRateLimitCounter(),
Expand Down Expand Up @@ -164,9 +158,6 @@ func WithMetrics(metrics *Metrics) CListMempoolOption {

// Safe for concurrent use by multiple goroutines.
func (mem *CListMempool) Lock() {
if mem.recheck.setRecheckFull() {
mem.logger.Debug("the state of recheckFull has flipped")
}
mem.updateMtx.Lock()
}

Expand Down Expand Up @@ -349,14 +340,12 @@ func (mem *CListMempool) globalCb(req *abci.Request, res *abci.Response) {

switch r := res.Value.(type) {
case *abci.Response_CheckTx:
tx := types.Tx(req.GetCheckTx().Tx)
if mem.recheck.done() {
mem.logger.Error("rechecking has finished; discard late recheck response",
"tx", log.NewLazySprintf("%v", tx.Key()))
checkTxReq := req.GetCheckTx()
if checkTxReq == nil {
return
}
mem.metrics.RecheckTimes.Add(1)
mem.resCbRecheck(tx, r.CheckTx)
mem.resCbRecheck(checkTxReq.Tx, r.CheckTx)

// update metrics
mem.metrics.Size.Set(float64(mem.Size()))
Expand All @@ -382,11 +371,6 @@ func (mem *CListMempool) reqResCb(
res *abci.Response,
externalCb func(*abci.ResponseCheckTx),
) {
if !mem.recheck.done() {
panic(log.NewLazySprintf("rechecking has not finished; cannot check new tx %v",
types.Tx(tx).Hash()))
}

mem.resCbFirstTime(tx, txInfo, res)

// update metrics
Expand Down Expand Up @@ -446,10 +430,6 @@ func (mem *CListMempool) isFull(txSize int) error {
}
}

if mem.recheck.consideredFull() {
return ErrRecheckFull
}

return nil
}

Expand Down Expand Up @@ -539,8 +519,10 @@ func (mem *CListMempool) resCbFirstTime(
// resCbFirstTime callback.
func (mem *CListMempool) resCbRecheck(tx types.Tx, res *abci.ResponseCheckTx) {
// Check whether tx is still in the list of transactions that can be rechecked.
if !mem.recheck.findNextEntryMatching(&tx) {
// Reached the end of the list and didn't find a matching tx; rechecking has finished.
txKey := tx.Key()
_, ok := mem.txsMap.Load(txKey)
if !ok {
mem.logger.Debug("re-CheckTx transaction does not exist", "expected", tx.Hash())
return
}

Expand Down Expand Up @@ -734,40 +716,32 @@ func (mem *CListMempool) recheckTxs() {
return
}

mem.recheck.init(mem.txs.Front(), mem.txs.Back())
wg := sync.WaitGroup{}

// NOTE: globalCb may be called concurrently, but CheckTx cannot be executed concurrently
// because this function has the lock (via Update and Lock).
for e := mem.txs.Front(); e != nil; e = e.Next() {
tx := e.Value.(*mempoolTx).tx
mem.recheck.numPendingTxs.Add(1)
wg.Add(1)

// Send a CheckTx request to the app. If we're using a sync client, the resCbRecheck
// callback will be called right after receiving the response.
_, err := mem.proxyAppConn.CheckTxAsync(context.TODO(), &abci.RequestCheckTx{
reqRes, err := mem.proxyAppConn.CheckTxAsync(context.TODO(), &abci.RequestCheckTx{
Tx: tx,
Type: abci.CheckTxType_Recheck,
})
if err != nil {
panic(fmt.Errorf("(re-)CheckTx request for tx %s failed: %w", log.NewLazySprintf("%v", tx.Hash()), err))
}
reqRes.SetCallback(func(res *abci.Response) {
wg.Done()
})
}
wg.Wait()

// Flush any pending asynchronous recheck requests to process.
mem.proxyAppConn.Flush(context.TODO())

// Give some time to finish processing the responses; then finish the rechecking process, even
// if not all txs were rechecked.
select {
case <-time.After(mem.config.RecheckTimeout):
mem.recheck.setDone()
mem.logger.Error("timed out waiting for recheck responses")
case <-mem.recheck.doneRechecking():
}

if n := mem.recheck.numPendingTxs.Load(); n > 0 {
mem.logger.Error("not all txs were rechecked", "not-rechecked", n)
}
mem.logger.Debug("done rechecking txs", "height", mem.height.Load(), "num-txs", mem.Size())
}

Expand All @@ -776,118 +750,6 @@ func (mem *CListMempool) ResetRateLimitCounter() {
mem.rateLimitCounter.reset()
}

// The cursor and end pointers define a dynamic list of transactions that could be rechecked. The
// end pointer is fixed. When a recheck response for a transaction is received, cursor will point to
// the entry in the mempool corresponding to that transaction, thus narrowing the list. Transactions
// corresponding to entries between the old and current positions of cursor will be ignored for
// rechecking. This is to guarantee that recheck responses are processed in the same sequential
// order as they appear in the mempool.
type recheck struct {
cursor *clist.CElement // next expected recheck response
end *clist.CElement // last entry in the mempool to recheck
doneCh chan struct{} // to signal that rechecking has finished successfully (for async app connections)
numPendingTxs atomic.Int32 // number of transactions still pending to recheck
isRechecking atomic.Bool // true iff the rechecking process has begun and is not yet finished
recheckFull atomic.Bool // whether rechecking TXs cannot be completed before a new block is decided
}

func newRecheck() *recheck {
return &recheck{
doneCh: make(chan struct{}, 1),
}
}

func (rc *recheck) init(first, last *clist.CElement) {
if !rc.done() {
panic("Having more than one rechecking process at a time is not possible.")
}
rc.cursor = first
rc.end = last
rc.numPendingTxs.Store(0)
rc.isRechecking.Store(true)
}

// done returns true when there is no recheck response to process.
// Safe for concurrent use by multiple goroutines.
func (rc *recheck) done() bool {
return !rc.isRechecking.Load()
}

// setDone registers that rechecking has finished.
func (rc *recheck) setDone() {
rc.cursor = nil
rc.recheckFull.Store(false)
rc.isRechecking.Store(false)
}

// setNextEntry sets cursor to the next entry in the list. If there is no next, cursor will be nil.
func (rc *recheck) setNextEntry() {
rc.cursor = rc.cursor.Next()
}

// tryFinish will check if the cursor is at the end of the list and notify the channel that
// rechecking has finished. It returns true iff it's done rechecking.
func (rc *recheck) tryFinish() bool {
if rc.cursor == rc.end {
// Reached end of the list without finding a matching tx.
rc.setDone()
}
if rc.done() {
// Notify that recheck has finished.
select {
case rc.doneCh <- struct{}{}:
default:
}
return true
}
return false
}

// findNextEntryMatching searches for the next transaction matching the given transaction, which
// corresponds to the recheck response to be processed next. Then it checks if it has reached the
// end of the list, so it can finish rechecking.
//
// The goal is to guarantee that transactions are rechecked in the order in which they are in the
// mempool. Transactions whose recheck response arrive late or don't arrive at all are skipped and
// not rechecked.
func (rc *recheck) findNextEntryMatching(tx *types.Tx) bool {
found := false
for ; !rc.done(); rc.setNextEntry() {
expectedTx := rc.cursor.Value.(*mempoolTx).tx
if bytes.Equal(*tx, expectedTx) {
// Found an entry in the list of txs to recheck that matches tx.
found = true
rc.numPendingTxs.Add(-1)
break
}
}

if !rc.tryFinish() {
// Not finished yet; set the cursor for processing the next recheck response.
rc.setNextEntry()
}
return found
}

// doneRechecking returns the channel used to signal that rechecking has finished.
func (rc *recheck) doneRechecking() <-chan struct{} {
return rc.doneCh
}

// setRecheckFull sets recheckFull to true if rechecking is still in progress. It returns true iff
// the value of recheckFull has changed.
func (rc *recheck) setRecheckFull() bool {
rechecking := !rc.done()
recheckFull := rc.recheckFull.Swap(rechecking)
return rechecking != recheckFull
}

// consideredFull returns true iff the mempool should be considered as full while rechecking is in
// progress.
func (rc *recheck) consideredFull() bool {
return rc.recheckFull.Load()
}

// RateLimitCounter tracks the number of transactions added to the mempool within a block.
// It is used to limit the rate of incoming transactions and is reset to zero after each block commit.
// The counter is incremented when a transaction is added to the mempool and provides a mechanism
Expand Down
18 changes: 0 additions & 18 deletions mempool/clist_mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,6 @@ func TestMempoolUpdateDoesNotPanicWhenApplicationMissedTx(t *testing.T) {
}
time.Sleep(10 * time.Millisecond)
require.Len(t, txs, mp.Size())
require.True(t, mp.recheck.done())

// Calling update to remove the first transaction from the mempool.
// This call also triggers the mempool to recheck its remaining transactions.
Expand Down Expand Up @@ -931,11 +930,6 @@ func TestMempoolAsyncRecheckTxReturnError(t *testing.T) {
// The 4 txs are added to the mempool.
require.Len(t, txs, mp.Size())

// Check that recheck has not started.
require.True(t, mp.recheck.done())
require.Nil(t, mp.recheck.cursor)
require.Nil(t, mp.recheck.end)
require.False(t, mp.recheck.isRechecking.Load())
mockClient.AssertExpectations(t)

// One call to CheckTxAsync per tx, for rechecking.
Expand All @@ -960,15 +954,8 @@ func TestMempoolAsyncRecheckTxReturnError(t *testing.T) {
callback(reqRes2.Request, reqRes2.Response)
}).Return(nil)

// mp.recheck.done() should be true only before and after calling recheckTxs.
mp.recheckTxs()
require.True(t, mp.recheck.done())
require.False(t, mp.recheck.isRechecking.Load())
require.Nil(t, mp.recheck.cursor)
require.NotNil(t, mp.recheck.end)
require.Equal(t, mp.recheck.end, mp.txs.Back())
require.Equal(t, len(txs)-1, mp.Size()) // one invalid tx was removed
require.Equal(t, int32(2), mp.recheck.numPendingTxs.Load())

mockClient.AssertExpectations(t)
}
Expand All @@ -989,15 +976,10 @@ func TestMempoolRecheckRace(t *testing.T) {
// Update one transaction to force rechecking the rest.
doUpdate(t, mp, 1, txs[:1])

// Recheck has finished
require.True(t, mp.recheck.done())
require.Nil(t, mp.recheck.cursor)

// Add again the same transaction that was updated. Recheck has finished so adding this tx
// should not result in a data race on the variable recheck.cursor.
_, err = mp.CheckTxSync(txs[:1][0], nil, TxInfo{})
require.Equal(t, err, ErrTxInCache)
require.Zero(t, mp.recheck.numPendingTxs.Load())
}

// Test adding transactions while a concurrent routine reaps txs and updates the mempool, simulating
Expand Down
Loading