From 3b84167492c31aa6024875f6cd5ce960d5701807 Mon Sep 17 00:00:00 2001 From: dudong2 Date: Tue, 21 Jan 2025 18:05:06 +0900 Subject: [PATCH 1/2] fix: Parallel rechecktx --- mempool/clist_mempool.go | 166 ++++----------------------------------- 1 file changed, 14 insertions(+), 152 deletions(-) diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index d63595eff12..51a8cb32b6a 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -1,13 +1,11 @@ package mempool import ( - "bytes" "context" "errors" "fmt" "sync" "sync/atomic" - "time" abci "github.com/cometbft/cometbft/abci/types" "github.com/cometbft/cometbft/config" @@ -45,9 +43,6 @@ type CListMempool struct { txs *clist.CList // concurrent linked-list of good txs proxyAppConn proxy.AppConnMempool - // Keeps track of the rechecking process. - recheck *recheck - // Map for quick access to txs to record sender in CheckTx. // txsMap: txKey -> CElement txsMap sync.Map @@ -85,7 +80,6 @@ func NewCListMempool( proxyAppConn: proxyAppConn, txs: clist.New(), chReqCheckTx: make(chan *requestCheckTxAsync, cfg.Size), - recheck: newRecheck(), logger: log.NewNopLogger(), metrics: NopMetrics(), rateLimitCounter: NewRateLimitCounter(), @@ -164,9 +158,6 @@ func WithMetrics(metrics *Metrics) CListMempoolOption { // Safe for concurrent use by multiple goroutines. func (mem *CListMempool) Lock() { - if mem.recheck.setRecheckFull() { - mem.logger.Debug("the state of recheckFull has flipped") - } mem.updateMtx.Lock() } @@ -349,14 +340,12 @@ func (mem *CListMempool) globalCb(req *abci.Request, res *abci.Response) { switch r := res.Value.(type) { case *abci.Response_CheckTx: - tx := types.Tx(req.GetCheckTx().Tx) - if mem.recheck.done() { - mem.logger.Error("rechecking has finished; discard late recheck response", - "tx", log.NewLazySprintf("%v", tx.Key())) + checkTxReq := req.GetCheckTx() + if checkTxReq == nil { return } mem.metrics.RecheckTimes.Add(1) - mem.resCbRecheck(tx, r.CheckTx) + mem.resCbRecheck(checkTxReq.Tx, r.CheckTx) // update metrics mem.metrics.Size.Set(float64(mem.Size())) @@ -382,11 +371,6 @@ func (mem *CListMempool) reqResCb( res *abci.Response, externalCb func(*abci.ResponseCheckTx), ) { - if !mem.recheck.done() { - panic(log.NewLazySprintf("rechecking has not finished; cannot check new tx %v", - types.Tx(tx).Hash())) - } - mem.resCbFirstTime(tx, txInfo, res) // update metrics @@ -446,10 +430,6 @@ func (mem *CListMempool) isFull(txSize int) error { } } - if mem.recheck.consideredFull() { - return ErrRecheckFull - } - return nil } @@ -539,8 +519,10 @@ func (mem *CListMempool) resCbFirstTime( // resCbFirstTime callback. func (mem *CListMempool) resCbRecheck(tx types.Tx, res *abci.ResponseCheckTx) { // Check whether tx is still in the list of transactions that can be rechecked. - if !mem.recheck.findNextEntryMatching(&tx) { - // Reached the end of the list and didn't find a matching tx; rechecking has finished. + txKey := tx.Key() + _, ok := mem.txsMap.Load(txKey) + if !ok { + mem.logger.Debug("re-CheckTx transaction does not exist", "expected", tx.Hash()) return } @@ -734,40 +716,32 @@ func (mem *CListMempool) recheckTxs() { return } - mem.recheck.init(mem.txs.Front(), mem.txs.Back()) + wg := sync.WaitGroup{} // NOTE: globalCb may be called concurrently, but CheckTx cannot be executed concurrently // because this function has the lock (via Update and Lock). for e := mem.txs.Front(); e != nil; e = e.Next() { tx := e.Value.(*mempoolTx).tx - mem.recheck.numPendingTxs.Add(1) + wg.Add(1) // Send a CheckTx request to the app. If we're using a sync client, the resCbRecheck // callback will be called right after receiving the response. - _, err := mem.proxyAppConn.CheckTxAsync(context.TODO(), &abci.RequestCheckTx{ + reqRes, err := mem.proxyAppConn.CheckTxAsync(context.TODO(), &abci.RequestCheckTx{ Tx: tx, Type: abci.CheckTxType_Recheck, }) if err != nil { panic(fmt.Errorf("(re-)CheckTx request for tx %s failed: %w", log.NewLazySprintf("%v", tx.Hash()), err)) } + reqRes.SetCallback(func(res *abci.Response) { + wg.Done() + }) } + wg.Wait() // Flush any pending asynchronous recheck requests to process. mem.proxyAppConn.Flush(context.TODO()) - // Give some time to finish processing the responses; then finish the rechecking process, even - // if not all txs were rechecked. - select { - case <-time.After(mem.config.RecheckTimeout): - mem.recheck.setDone() - mem.logger.Error("timed out waiting for recheck responses") - case <-mem.recheck.doneRechecking(): - } - - if n := mem.recheck.numPendingTxs.Load(); n > 0 { - mem.logger.Error("not all txs were rechecked", "not-rechecked", n) - } mem.logger.Debug("done rechecking txs", "height", mem.height.Load(), "num-txs", mem.Size()) } @@ -776,118 +750,6 @@ func (mem *CListMempool) ResetRateLimitCounter() { mem.rateLimitCounter.reset() } -// The cursor and end pointers define a dynamic list of transactions that could be rechecked. The -// end pointer is fixed. When a recheck response for a transaction is received, cursor will point to -// the entry in the mempool corresponding to that transaction, thus narrowing the list. Transactions -// corresponding to entries between the old and current positions of cursor will be ignored for -// rechecking. This is to guarantee that recheck responses are processed in the same sequential -// order as they appear in the mempool. -type recheck struct { - cursor *clist.CElement // next expected recheck response - end *clist.CElement // last entry in the mempool to recheck - doneCh chan struct{} // to signal that rechecking has finished successfully (for async app connections) - numPendingTxs atomic.Int32 // number of transactions still pending to recheck - isRechecking atomic.Bool // true iff the rechecking process has begun and is not yet finished - recheckFull atomic.Bool // whether rechecking TXs cannot be completed before a new block is decided -} - -func newRecheck() *recheck { - return &recheck{ - doneCh: make(chan struct{}, 1), - } -} - -func (rc *recheck) init(first, last *clist.CElement) { - if !rc.done() { - panic("Having more than one rechecking process at a time is not possible.") - } - rc.cursor = first - rc.end = last - rc.numPendingTxs.Store(0) - rc.isRechecking.Store(true) -} - -// done returns true when there is no recheck response to process. -// Safe for concurrent use by multiple goroutines. -func (rc *recheck) done() bool { - return !rc.isRechecking.Load() -} - -// setDone registers that rechecking has finished. -func (rc *recheck) setDone() { - rc.cursor = nil - rc.recheckFull.Store(false) - rc.isRechecking.Store(false) -} - -// setNextEntry sets cursor to the next entry in the list. If there is no next, cursor will be nil. -func (rc *recheck) setNextEntry() { - rc.cursor = rc.cursor.Next() -} - -// tryFinish will check if the cursor is at the end of the list and notify the channel that -// rechecking has finished. It returns true iff it's done rechecking. -func (rc *recheck) tryFinish() bool { - if rc.cursor == rc.end { - // Reached end of the list without finding a matching tx. - rc.setDone() - } - if rc.done() { - // Notify that recheck has finished. - select { - case rc.doneCh <- struct{}{}: - default: - } - return true - } - return false -} - -// findNextEntryMatching searches for the next transaction matching the given transaction, which -// corresponds to the recheck response to be processed next. Then it checks if it has reached the -// end of the list, so it can finish rechecking. -// -// The goal is to guarantee that transactions are rechecked in the order in which they are in the -// mempool. Transactions whose recheck response arrive late or don't arrive at all are skipped and -// not rechecked. -func (rc *recheck) findNextEntryMatching(tx *types.Tx) bool { - found := false - for ; !rc.done(); rc.setNextEntry() { - expectedTx := rc.cursor.Value.(*mempoolTx).tx - if bytes.Equal(*tx, expectedTx) { - // Found an entry in the list of txs to recheck that matches tx. - found = true - rc.numPendingTxs.Add(-1) - break - } - } - - if !rc.tryFinish() { - // Not finished yet; set the cursor for processing the next recheck response. - rc.setNextEntry() - } - return found -} - -// doneRechecking returns the channel used to signal that rechecking has finished. -func (rc *recheck) doneRechecking() <-chan struct{} { - return rc.doneCh -} - -// setRecheckFull sets recheckFull to true if rechecking is still in progress. It returns true iff -// the value of recheckFull has changed. -func (rc *recheck) setRecheckFull() bool { - rechecking := !rc.done() - recheckFull := rc.recheckFull.Swap(rechecking) - return rechecking != recheckFull -} - -// consideredFull returns true iff the mempool should be considered as full while rechecking is in -// progress. -func (rc *recheck) consideredFull() bool { - return rc.recheckFull.Load() -} - // RateLimitCounter tracks the number of transactions added to the mempool within a block. // It is used to limit the rate of incoming transactions and is reset to zero after each block commit. // The counter is incremented when a transaction is added to the mempool and provides a mechanism From e1c49a717c77d7a70c49b2c8d55e9c5bed58f9f5 Mon Sep 17 00:00:00 2001 From: dudong2 Date: Tue, 21 Jan 2025 18:10:51 +0900 Subject: [PATCH 2/2] test: Remove recheck related codes --- mempool/clist_mempool_test.go | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/mempool/clist_mempool_test.go b/mempool/clist_mempool_test.go index 86adf9e4888..024cc776813 100644 --- a/mempool/clist_mempool_test.go +++ b/mempool/clist_mempool_test.go @@ -305,7 +305,6 @@ func TestMempoolUpdateDoesNotPanicWhenApplicationMissedTx(t *testing.T) { } time.Sleep(10 * time.Millisecond) require.Len(t, txs, mp.Size()) - require.True(t, mp.recheck.done()) // Calling update to remove the first transaction from the mempool. // This call also triggers the mempool to recheck its remaining transactions. @@ -931,11 +930,6 @@ func TestMempoolAsyncRecheckTxReturnError(t *testing.T) { // The 4 txs are added to the mempool. require.Len(t, txs, mp.Size()) - // Check that recheck has not started. - require.True(t, mp.recheck.done()) - require.Nil(t, mp.recheck.cursor) - require.Nil(t, mp.recheck.end) - require.False(t, mp.recheck.isRechecking.Load()) mockClient.AssertExpectations(t) // One call to CheckTxAsync per tx, for rechecking. @@ -960,15 +954,8 @@ func TestMempoolAsyncRecheckTxReturnError(t *testing.T) { callback(reqRes2.Request, reqRes2.Response) }).Return(nil) - // mp.recheck.done() should be true only before and after calling recheckTxs. mp.recheckTxs() - require.True(t, mp.recheck.done()) - require.False(t, mp.recheck.isRechecking.Load()) - require.Nil(t, mp.recheck.cursor) - require.NotNil(t, mp.recheck.end) - require.Equal(t, mp.recheck.end, mp.txs.Back()) require.Equal(t, len(txs)-1, mp.Size()) // one invalid tx was removed - require.Equal(t, int32(2), mp.recheck.numPendingTxs.Load()) mockClient.AssertExpectations(t) } @@ -989,15 +976,10 @@ func TestMempoolRecheckRace(t *testing.T) { // Update one transaction to force rechecking the rest. doUpdate(t, mp, 1, txs[:1]) - // Recheck has finished - require.True(t, mp.recheck.done()) - require.Nil(t, mp.recheck.cursor) - // Add again the same transaction that was updated. Recheck has finished so adding this tx // should not result in a data race on the variable recheck.cursor. _, err = mp.CheckTxSync(txs[:1][0], nil, TxInfo{}) require.Equal(t, err, ErrTxInCache) - require.Zero(t, mp.recheck.numPendingTxs.Load()) } // Test adding transactions while a concurrent routine reaps txs and updates the mempool, simulating