Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: remove txs which do not end up in a block out of the mempool #15

Merged
merged 4 commits into from
May 15, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
remove txs which do not end up in a block out of the mempool
bharath-123 committed May 14, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 4527d595101508f34c8a4365c7098d5e5d5553b2
8 changes: 5 additions & 3 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
@@ -334,9 +334,11 @@ func New(config Config, chain BlockChain) *BlobPool {
}
}

func (p *BlobPool) SetAstriaOrdered(types.Transactions) {}
func (p *BlobPool) ClearAstriaOrdered() {}
func (p *BlobPool) AstriaOrdered() *types.Transactions { return &types.Transactions{} }
func (p *BlobPool) SetAstriaOrdered(types.Transactions) {}
func (p *BlobPool) ClearAstriaOrdered() {}
func (p *BlobPool) UpdateAstriaInvalid(*types.Transaction) {}
func (p *BlobPool) AstriaInvalid() *types.Transactions { return &types.Transactions{} }
func (p *BlobPool) AstriaOrdered() *types.Transactions { return &types.Transactions{} }

// Filter returns whether the given transaction can be consumed by the blob pool.
func (p *BlobPool) Filter(tx *types.Transaction) bool {
40 changes: 27 additions & 13 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
@@ -103,7 +103,7 @@ var (

// Metrics related to the astria ordered txs
astriaValidMeter = metrics.GetOrRegisterMeter("astria/txpool/valid", nil)
astriaParsedMeter = metrics.GetOrRegisterMeter("astria/txpool/parsed", nil)
astriaInvalidMeter = metrics.GetOrRegisterMeter("astria/txpool/invalid", nil)
astriaRequestedMeter = metrics.GetOrRegisterMeter("astria/txpool/requested", nil)
)

@@ -280,32 +280,30 @@ func New(config Config, chain BlockChain) *LegacyPool {
}

type astriaOrdered struct {
valid types.Transactions
parsed types.Transactions
pool *LegacyPool
valid types.Transactions
invalid types.Transactions
pool *LegacyPool
}

func newAstriaOrdered(valid types.Transactions, parsed types.Transactions, pool *LegacyPool) *astriaOrdered {
astriaParsedMeter.Mark(int64(len(parsed)))
func newAstriaOrdered(valid types.Transactions, pool *LegacyPool) *astriaOrdered {
astriaValidMeter.Mark(int64(len(valid)))

return &astriaOrdered{
valid: valid,
parsed: parsed,
pool: pool,
valid: valid,
invalid: types.Transactions{},
pool: pool,
}
}

func (ao *astriaOrdered) clear() {
ao.valid = types.Transactions{}
ao.parsed = types.Transactions{}
ao.invalid = types.Transactions{}
}

func (pool *LegacyPool) SetAstriaOrdered(txs types.Transactions) {
astriaRequestedMeter.Mark(int64(len(txs)))

valid := []*types.Transaction{}
parsed := []*types.Transaction{}
for idx, tx := range txs {
err := pool.validateTxBasics(tx, false)
if err != nil {
@@ -316,15 +314,31 @@ func (pool *LegacyPool) SetAstriaOrdered(txs types.Transactions) {
valid = append(valid, tx)
}

pool.astria = newAstriaOrdered(types.Transactions(valid), types.Transactions(parsed), pool)
pool.astria = newAstriaOrdered(valid, pool)
}

func (pool *LegacyPool) UpdateAstriaInvalid(tx *types.Transaction) {
if pool.astria.invalid == nil {
pool.astria.invalid = types.Transactions{tx}
}

pool.astria.invalid = append(pool.astria.invalid, tx)
}

func (pool *LegacyPool) AstriaInvalid() *types.Transactions {
if pool.astria == nil {
return &types.Transactions{}
}
return &pool.astria.invalid
}

func (pool *LegacyPool) ClearAstriaOrdered() {
if pool.astria == nil {
return
}

for _, tx := range pool.astria.parsed {
astriaInvalidMeter.Mark(int64(len(pool.astria.invalid)))
for _, tx := range pool.astria.invalid {
pool.removeTx(tx.Hash(), false, true)
}

2 changes: 2 additions & 0 deletions core/txpool/subpool.go
Original file line number Diff line number Diff line change
@@ -140,5 +140,7 @@ type SubPool interface {

SetAstriaOrdered(types.Transactions)
ClearAstriaOrdered()
UpdateAstriaInvalid(tx *types.Transaction)
AstriaInvalid() *types.Transactions
AstriaOrdered() *types.Transactions
}
17 changes: 17 additions & 0 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
@@ -273,6 +273,23 @@ func (p *TxPool) ClearAstriaOrdered() {
}
}

func (p *TxPool) UpdateAstriaInvalid(tx *types.Transaction) {
for _, subpool := range p.subpools {
subpool.UpdateAstriaInvalid(tx)
}
}

func (p *TxPool) AstriaInvalid() *types.Transactions {
txs := types.Transactions{}

for _, subpool := range p.subpools {
subpoolTxs := subpool.AstriaInvalid()
txs = append(txs, *subpoolTxs...)
}

return &txs
}

func (p *TxPool) AstriaOrdered() *types.Transactions {
txs := types.Transactions{}

7 changes: 2 additions & 5 deletions miner/payload_building.go
Original file line number Diff line number Diff line change
@@ -176,10 +176,7 @@ func (payload *Payload) ResolveFull() *engine.ExecutionPayloadEnvelope {

// buildPayload builds the payload according to the provided parameters.
func (w *worker) buildPayload(args *BuildPayloadArgs) (*Payload, error) {
// Build the initial version with no transaction included. It should be fast
// enough to run. The empty payload can at least make sure there is something
// to deliver for not missing slot.
emptyParams := &generateParams{
fullParams := &generateParams{
timestamp: args.Timestamp,
forceTime: true,
parentHash: args.Parent,
@@ -190,7 +187,7 @@ func (w *worker) buildPayload(args *BuildPayloadArgs) (*Payload, error) {
noTxs: false,
}
start := time.Now()
full := w.getSealingBlock(emptyParams)
full := w.getSealingBlock(fullParams)
Copy link
Contributor Author

@bharath-123 bharath-123 May 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In vanilla geth, an empty block is created and then a full block is created. This is mainly to spend more time to optimize for gas fees and also to return at least an empty block if we were not able to create a full block on time.
We don't need this astria since we just blindly use the txs received from ExecuteBlock. so fullParams makes more sense than emptyParams over here.

if full.err != nil {
return nil, full.err
}
157 changes: 135 additions & 22 deletions miner/payload_building_test.go
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@
package miner

import (
"math/big"
"reflect"
"testing"
"time"
@@ -38,16 +39,7 @@ func TestBuildPayload(t *testing.T) {
defer w.close()

timestamp := uint64(time.Now().Unix())
args := &BuildPayloadArgs{
Parent: b.chain.CurrentBlock().Hash(),
Timestamp: timestamp,
Random: common.Hash{},
FeeRecipient: recipient,
}
payload, err := w.buildPayload(args)
if err != nil {
t.Fatalf("Failed to build payload %v", err)
}

verify := func(outer *engine.ExecutionPayloadEnvelope, txs int) {
payload := outer.ExecutionPayload
if payload.ParentHash != b.chain.CurrentBlock().Hash() {
@@ -66,18 +58,139 @@ func TestBuildPayload(t *testing.T) {
t.Fatal("Unexpect transaction set")
}
}
empty := payload.ResolveEmpty()
verify(empty, 0)

full := payload.ResolveFull()
verify(full, len(pendingTxs))

// Ensure resolve can be called multiple times and the
// result should be unchanged
dataOne := payload.Resolve()
dataTwo := payload.Resolve()
if !reflect.DeepEqual(dataOne, dataTwo) {
t.Fatal("Unexpected payload data")

txGasPrice := big.NewInt(10 * params.InitialBaseFee)

tests := []struct {
name string
txsToBuildPayload types.Transactions
expectedTxsInPayload types.Transactions
invalidTxs types.Transactions
}{
{
name: "empty",
txsToBuildPayload: types.Transactions{},
expectedTxsInPayload: types.Transactions{},
invalidTxs: types.Transactions{},
},
{
name: "transactions with gas enough to fit into a single block",
txsToBuildPayload: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), params.TxGas, txGasPrice, nil),
types.NewTransaction(b.txPool.Nonce(testBankAddress)+1, testUserAddress, big.NewInt(2000), params.TxGas, txGasPrice, nil),
},
expectedTxsInPayload: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), params.TxGas, txGasPrice, nil),
types.NewTransaction(b.txPool.Nonce(testBankAddress)+1, testUserAddress, big.NewInt(2000), params.TxGas, txGasPrice, nil),
},
invalidTxs: types.Transactions{},
},
{
name: "transactions with gas which doesn't fit in a single block",
txsToBuildPayload: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), b.BlockChain().GasLimit()-10000, txGasPrice, nil),
types.NewTransaction(b.txPool.Nonce(testBankAddress)+1, testUserAddress, big.NewInt(1000), b.BlockChain().GasLimit()-10000, txGasPrice, nil),
},
expectedTxsInPayload: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), b.BlockChain().GasLimit()-10000, txGasPrice, nil),
},
invalidTxs: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress)+1, testUserAddress, big.NewInt(1000), b.BlockChain().GasLimit()-10000, txGasPrice, nil),
},
},
{
name: "transactions with nonce too high",
txsToBuildPayload: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), params.TxGas, txGasPrice, nil),
types.NewTransaction(b.txPool.Nonce(testBankAddress)+4, testUserAddress, big.NewInt(2000), params.TxGas, txGasPrice, nil),
},
expectedTxsInPayload: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), params.TxGas, txGasPrice, nil),
},
invalidTxs: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress)+4, testUserAddress, big.NewInt(2000), params.TxGas, txGasPrice, nil),
},
},
{
name: "transactions with nonce too low",
txsToBuildPayload: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), params.TxGas, txGasPrice, nil),
types.NewTransaction(b.txPool.Nonce(testBankAddress)-1, testUserAddress, big.NewInt(2000), params.TxGas, txGasPrice, nil),
},
expectedTxsInPayload: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(1000), params.TxGas, txGasPrice, nil),
},
invalidTxs: types.Transactions{
types.NewTransaction(b.txPool.Nonce(testBankAddress)-1, testUserAddress, big.NewInt(2000), params.TxGas, txGasPrice, nil),
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
signedTxs := types.Transactions{}
signedInvalidTxs := types.Transactions{}

for _, tx := range tt.txsToBuildPayload {
signedTx, err := types.SignTx(tx, types.HomesteadSigner{}, testBankKey)
if err != nil {
t.Fatalf("Failed to sign tx %v", err)
}
signedTxs = append(signedTxs, signedTx)
}

for _, tx := range tt.invalidTxs {
signedTx, err := types.SignTx(tx, types.HomesteadSigner{}, testBankKey)
if err != nil {
t.Fatalf("Failed to sign tx %v", err)
}
signedInvalidTxs = append(signedInvalidTxs, signedTx)
}

// set the astria ordered txsToBuildPayload
b.TxPool().SetAstriaOrdered(signedTxs)
astriaTxs := b.TxPool().AstriaOrdered()

if astriaTxs.Len() != len(tt.txsToBuildPayload) {
t.Fatalf("Unexpected number of astria ordered transactions: %d", astriaTxs.Len())
}

txs := types.TxDifference(*astriaTxs, signedTxs)
if txs.Len() != 0 {
t.Fatalf("Unexpected transactions in astria ordered transactions: %v", txs)
}

args := &BuildPayloadArgs{
Parent: b.chain.CurrentBlock().Hash(),
Timestamp: timestamp,
Random: common.Hash{},
FeeRecipient: recipient,
}

payload, err := w.buildPayload(args)
if err != nil {
t.Fatalf("Failed to build payload %v", err)
}
full := payload.ResolveFull()
verify(full, len(tt.expectedTxsInPayload))

// Ensure resolve can be called multiple times and the
// result should be unchanged
dataOne := payload.Resolve()
dataTwo := payload.Resolve()
if !reflect.DeepEqual(dataOne, dataTwo) {
t.Fatal("Unexpected payload data")
}

// Ensure invalid transactions are stored
if len(tt.invalidTxs) > 0 {
invalidTxs := b.TxPool().AstriaInvalid()
txDifference := types.TxDifference(*invalidTxs, signedInvalidTxs)
if txDifference.Len() != 0 {
t.Fatalf("Unexpected invalid transactions in astria invalid transactions: %v", txDifference)
}
}
})
}
}

10 changes: 8 additions & 2 deletions miner/worker.go
Original file line number Diff line number Diff line change
@@ -810,6 +810,8 @@ func (w *worker) commitAstriaTransactions(env *environment, txs *types.Transacti
// If we don't have enough gas for any further transactions then we're done.
if env.gasPool.Gas() < params.TxGas {
log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas)
// remove txs from the mempool if they are too big for this block
w.eth.TxPool().UpdateAstriaInvalid(tx)
break
}

@@ -821,7 +823,7 @@ func (w *worker) commitAstriaTransactions(env *environment, txs *types.Transacti
// phase, start ignoring the sender until we do.
if tx.Protected() && !w.chainConfig.IsEIP155(env.header.Number) {
log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", w.chainConfig.EIP155Block)

w.eth.TxPool().UpdateAstriaInvalid(tx)
continue
}
// Start executing the transaction
@@ -839,7 +841,7 @@ func (w *worker) commitAstriaTransactions(env *environment, txs *types.Transacti

case errors.Is(err, core.ErrNonceTooHigh):
// Reorg notification data race between the transaction pool and miner, skip account =
log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce())
log.Trace("Skipping account with high nonce", "sender", from, "nonce", tx.Nonce())

case errors.Is(err, nil):
// Everything ok, collect the logs and shift in the next transaction from the same account
@@ -855,6 +857,10 @@ func (w *worker) commitAstriaTransactions(env *environment, txs *types.Transacti
// nonce-too-high clause will prevent us from executing in vain).
log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err)
}
if err != nil {
log.Trace("Marking transaction as invalid", "hash", tx.Hash(), "err", err)
w.eth.TxPool().UpdateAstriaInvalid(tx)
Copy link
Contributor Author

@bharath-123 bharath-123 May 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also remove if a tx is skipped because of a high nonce or low nonce? or only because it exceeded the gas limit?

Copy link
Contributor Author

@bharath-123 bharath-123 May 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We ideally should to maintain 1:1 block determinism. So we should include all of these cases.

}
}
if !w.isRunning() && len(coalescedLogs) > 0 {
// We don't push the pendingLogsEvent while we are sealing. The reason is that
1 change: 0 additions & 1 deletion miner/worker_test.go
Original file line number Diff line number Diff line change
@@ -160,7 +160,6 @@ func (b *testWorkerBackend) newRandomTx(creation bool) *types.Transaction {

func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, db ethdb.Database, blocks int) (*worker, *testWorkerBackend) {
backend := newTestWorkerBackend(t, chainConfig, engine, db, blocks)
backend.txPool.Add(pendingTxs, true, false)
w := newWorker(testConfig, chainConfig, engine, backend, new(event.TypeMux), nil, false)
w.setEtherbase(testBankAddress)
return w, backend