Skip to content

Commit

Permalink
Merge pull request #13 from astriaorg/bharath/cherry-pick-fix-mempool
Browse files Browse the repository at this point in the history
cherry-pick: fix: use mutex lock for tx removal (#51)
  • Loading branch information
bharath-123 authored Oct 16, 2024
2 parents 401c3d5 + c3cfc16 commit 459cc3d
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 0 deletions.
2 changes: 2 additions & 0 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,8 @@ func (pool *LegacyPool) ClearAstriaOrdered() {
if pool.astria == nil {
return
}
pool.mu.Lock()
defer pool.mu.Unlock()

astriaExcludedFromBlockMeter.Mark(int64(len(pool.astria.excludedFromBlock)))
for _, tx := range pool.astria.excludedFromBlock {
Expand Down
61 changes: 61 additions & 0 deletions core/txpool/legacypool/legacypool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,67 @@ func TestChainFork(t *testing.T) {
}
}

func TestRemoveTxSanity(t *testing.T) {
t.Parallel()

pool, key := setupPool()
defer pool.Close()

addr := crypto.PubkeyToAddress(key.PublicKey)
resetState := func() {
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
statedb.AddBalance(addr, uint256.NewInt(100000000000000), tracing.BalanceChangeUnspecified)

pool.chain = newTestBlockChain(pool.chainconfig, 1000000, statedb, new(event.Feed))
<-pool.requestReset(nil, nil)
}
resetState()

tx1 := transaction(0, 100000, key)
tx2 := transaction(1, 100000, key)
tx3 := transaction(2, 100000, key)

if err := pool.addLocal(tx1); err != nil {
t.Error("didn't expect error", err)
}
if err := pool.addLocal(tx2); err != nil {
t.Error("didn't expect error", err)
}
if err := pool.addLocal(tx3); err != nil {
t.Error("didn't expect error", err)
}

pendingTxs := pool.pending[addr]
if pendingTxs.Len() != 3 {
t.Error("expected 3 pending transactions, got", pendingTxs.Len())
}

if err := validatePoolInternals(pool); err != nil {
t.Errorf("pool internals validation failed: %v", err)
}

n := pool.removeTx(tx1.Hash(), false, true)
if n != 3 {
t.Error("expected 3 transactions to be removed, got", n)
}
n = pool.removeTx(tx2.Hash(), false, true)
if n != 0 {
t.Error("expected 0 transactions to be removed, got", n)
}
n = pool.removeTx(tx3.Hash(), false, true)
if n != 0 {
t.Error("expected 0 transactions to be removed, got", n)
}

if len(pool.pending) != 0 {
t.Error("expected 0 pending transactions, got", pendingTxs.Len())
}

if err := validatePoolInternals(pool); err != nil {
t.Errorf("pool internals validation failed: %v", err)
}
}

func TestDoubleNonce(t *testing.T) {
t.Parallel()

Expand Down
137 changes: 137 additions & 0 deletions grpc/execution/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
"math/big"
"testing"
"time"
)

func TestExecutionService_GetGenesisInfo(t *testing.T) {
Expand Down Expand Up @@ -475,3 +476,139 @@ func TestExecutionServiceServerV1Alpha2_ExecuteBlockAndUpdateCommitment(t *testi
balanceDiff := new(uint256.Int).Sub(chainDestinationAddressBalanceAfter, chainDestinationAddressBalanceBefore)
require.True(t, balanceDiff.Cmp(uint256.NewInt(1000000000000000000)) == 0, "Chain destination address balance is not correct")
}

// Check that invalid transactions are not added into a block and are removed from the mempool
func TestExecutionServiceServerV1Alpha2_ExecuteBlockAndUpdateCommitmentWithInvalidTransactions(t *testing.T) {
ethservice, serviceV1Alpha1 := setupExecutionService(t, 10)

// call genesis info
genesisInfo, err := serviceV1Alpha1.GetGenesisInfo(context.Background(), &astriaPb.GetGenesisInfoRequest{})
require.Nil(t, err, "GetGenesisInfo failed")
require.NotNil(t, genesisInfo, "GenesisInfo is nil")

// call get commitment state
commitmentState, err := serviceV1Alpha1.GetCommitmentState(context.Background(), &astriaPb.GetCommitmentStateRequest{})
require.Nil(t, err, "GetCommitmentState failed")
require.NotNil(t, commitmentState, "CommitmentState is nil")

ethservice.BlockChain().SetSafe(ethservice.BlockChain().CurrentBlock())

// get previous block hash
previousBlock := ethservice.BlockChain().CurrentSafeBlock()
require.NotNil(t, previousBlock, "Previous block not found")

gasLimit := ethservice.BlockChain().GasLimit()

stateDb, err := ethservice.BlockChain().StateAt(previousBlock.Root)
require.Nil(t, err, "Failed to get state db")

latestNonce := stateDb.GetNonce(testAddr)

// create 5 txs
txs := []*types.Transaction{}
marshalledTxs := []*sequencerblockv1alpha1.RollupData{}
for i := 0; i < 5; i++ {
unsignedTx := types.NewTransaction(latestNonce+uint64(i), testToAddress, big.NewInt(1), params.TxGas, big.NewInt(params.InitialBaseFee*2), nil)
tx, err := types.SignTx(unsignedTx, types.LatestSigner(ethservice.BlockChain().Config()), testKey)
require.Nil(t, err, "Failed to sign tx")
txs = append(txs, tx)

marshalledTx, err := tx.MarshalBinary()
require.Nil(t, err, "Failed to marshal tx")
marshalledTxs = append(marshalledTxs, &sequencerblockv1alpha1.RollupData{
Value: &sequencerblockv1alpha1.RollupData_SequencedData{SequencedData: marshalledTx},
})
}

// add a tx with lesser gas than the base gas
unsignedTx := types.NewTransaction(latestNonce+uint64(5), testToAddress, big.NewInt(1), gasLimit, big.NewInt(params.InitialBaseFee*2), nil)
tx, err := types.SignTx(unsignedTx, types.LatestSigner(ethservice.BlockChain().Config()), testKey)
require.Nil(t, err, "Failed to sign tx")
txs = append(txs, tx)

marshalledTx, err := tx.MarshalBinary()
require.Nil(t, err, "Failed to marshal tx")
marshalledTxs = append(marshalledTxs, &sequencerblockv1alpha1.RollupData{
Value: &sequencerblockv1alpha1.RollupData_SequencedData{SequencedData: marshalledTx},
})

errors := ethservice.TxPool().Add(txs, true, false)
for _, err := range errors {
require.Nil(t, err, "Failed to add tx to pool")
}

pending, queued := ethservice.TxPool().Stats()
require.Equal(t, 6, pending, "Pending txs should be 6")
require.Equal(t, 0, queued, "Queued txs should be 0")

executeBlockReq := &astriaPb.ExecuteBlockRequest{
PrevBlockHash: previousBlock.Hash().Bytes(),
Timestamp: &timestamppb.Timestamp{
Seconds: int64(previousBlock.Time + 2),
},
Transactions: marshalledTxs,
}

executeBlockRes, err := serviceV1Alpha1.ExecuteBlock(context.Background(), executeBlockReq)
require.Nil(t, err, "ExecuteBlock failed")

require.NotNil(t, executeBlockRes, "ExecuteBlock response is nil")

// check if astria ordered txs are cleared
astriaOrdered := ethservice.TxPool().AstriaOrdered()
require.Equal(t, 0, astriaOrdered.Len(), "AstriaOrdered should be empty")

// call update commitment state to set the block we executed as soft and firm
updateCommitmentStateReq := &astriaPb.UpdateCommitmentStateRequest{
CommitmentState: &astriaPb.CommitmentState{
Soft: &astriaPb.Block{
Hash: executeBlockRes.Hash,
ParentBlockHash: executeBlockRes.ParentBlockHash,
Number: executeBlockRes.Number,
Timestamp: executeBlockRes.Timestamp,
},
Firm: &astriaPb.Block{
Hash: executeBlockRes.Hash,
ParentBlockHash: executeBlockRes.ParentBlockHash,
Number: executeBlockRes.Number,
Timestamp: executeBlockRes.Timestamp,
},
BaseCelestiaHeight: commitmentState.BaseCelestiaHeight + 1,
},
}

updateCommitmentStateRes, err := serviceV1Alpha1.UpdateCommitmentState(context.Background(), updateCommitmentStateReq)
require.Nil(t, err, "UpdateCommitmentState failed")
require.NotNil(t, updateCommitmentStateRes, "UpdateCommitmentState response should not be nil")
require.Equal(t, updateCommitmentStateRes, updateCommitmentStateReq.CommitmentState, "CommitmentState response should match request")

// get the soft and firm block
softBlock := ethservice.BlockChain().CurrentSafeBlock()
require.NotNil(t, softBlock, "SoftBlock is nil")
firmBlock := ethservice.BlockChain().CurrentFinalBlock()
require.NotNil(t, firmBlock, "FirmBlock is nil")

block := ethservice.BlockChain().GetBlockByNumber(softBlock.Number.Uint64())
require.NotNil(t, block, "Soft Block not found")
require.Equal(t, block.Transactions().Len(), 5, "Soft Block should have 5 txs")

// give the tx loop time to run
time.Sleep(1 * time.Millisecond)

// after the tx loop is run, all pending txs should be removed
pending, queued = ethservice.TxPool().Stats()
require.Equal(t, 0, pending, "Pending txs should be 0")
require.Equal(t, 0, queued, "Queued txs should be 0")

// check if the soft and firm block are set correctly
require.True(t, bytes.Equal(softBlock.Hash().Bytes(), updateCommitmentStateRes.Soft.Hash), "Soft Block Hashes do not match")
require.True(t, bytes.Equal(softBlock.ParentHash.Bytes(), updateCommitmentStateRes.Soft.ParentBlockHash), "Soft Block Parent Hash do not match")
require.Equal(t, softBlock.Number.Uint64(), uint64(updateCommitmentStateRes.Soft.Number), "Soft Block Number do not match")

require.True(t, bytes.Equal(firmBlock.Hash().Bytes(), updateCommitmentStateRes.Firm.Hash), "Firm Block Hashes do not match")
require.True(t, bytes.Equal(firmBlock.ParentHash.Bytes(), updateCommitmentStateRes.Firm.ParentBlockHash), "Firm Block Parent Hash do not match")
require.Equal(t, firmBlock.Number.Uint64(), uint64(updateCommitmentStateRes.Firm.Number), "Firm Block Number do not match")

celestiaBaseHeight := ethservice.BlockChain().CurrentBaseCelestiaHeight()
require.Equal(t, celestiaBaseHeight, updateCommitmentStateRes.BaseCelestiaHeight, "BaseCelestiaHeight should be updated in db")
}

0 comments on commit 459cc3d

Please sign in to comment.