diff --git a/zk/stages/stage_sequence_execute_transactions.go b/zk/stages/stage_sequence_execute_transactions.go index 6adc3b9ae53..b71c5e2f88f 100644 --- a/zk/stages/stage_sequence_execute_transactions.go +++ b/zk/stages/stage_sequence_execute_transactions.go @@ -39,10 +39,13 @@ func getNextPoolTransactions(ctx context.Context, cfg SequenceBlockCfg, executio if allConditionsOk, _, err = cfg.txPool.YieldBest(cfg.yieldSize, &slots, poolTx, executionAt, gasLimit, alreadyYielded); err != nil { return err } - yieldedTxs, err := extractTransactionsFromSlot(&slots) + yieldedTxs, toRemove, err := extractTransactionsFromSlot(&slots) if err != nil { return err } + for _, txId := range toRemove { + cfg.txPool.MarkForDiscardFromPendingBest(txId) + } transactions = append(transactions, yieldedTxs...) return nil }); err != nil { @@ -65,7 +68,9 @@ func getLimboTransaction(ctx context.Context, cfg SequenceBlockCfg, txHash *comm } if slots != nil { - transactions, err = extractTransactionsFromSlot(slots) + // ignore the toRemove value here, we know the RLP will be sound as we had to read it from the pool + // in the first place to get it into limbo + transactions, _, err = extractTransactionsFromSlot(slots) if err != nil { return err } @@ -79,10 +84,11 @@ func getLimboTransaction(ctx context.Context, cfg SequenceBlockCfg, txHash *comm return transactions, nil } -func extractTransactionsFromSlot(slot *types2.TxsRlp) ([]types.Transaction, error) { +func extractTransactionsFromSlot(slot *types2.TxsRlp) ([]types.Transaction, []common.Hash, error) { transactions := make([]types.Transaction, 0, len(slot.Txs)) reader := bytes.NewReader([]byte{}) stream := new(rlp.Stream) + toRemove := make([]common.Hash, 0) for idx, txBytes := range slot.Txs { reader.Reset(txBytes) stream.Reset(reader, uint64(len(txBytes))) @@ -91,14 +97,18 @@ func extractTransactionsFromSlot(slot *types2.TxsRlp) ([]types.Transaction, erro continue } if err != nil { - return nil, err + // we have a transaction that cannot be decoded or a similar issue. We don't want to handle + // this tx so just WARN about it and remove it from the pool and continue + log.Warn("Failed to decode transaction from pool, skipping and removing from pool", "error", err) + toRemove = append(toRemove, slot.TxIds[idx]) + continue } var sender common.Address copy(sender[:], slot.Senders.At(idx)) transaction.SetSender(sender) transactions = append(transactions, transaction) } - return transactions, nil + return transactions, toRemove, nil } type overflowType uint8 diff --git a/zk/txpool/pool_zk.go b/zk/txpool/pool_zk.go index 6a653159c2d..ff0cbb3224e 100644 --- a/zk/txpool/pool_zk.go +++ b/zk/txpool/pool_zk.go @@ -220,6 +220,7 @@ func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableG } txs.Txs[count] = rlpTx + txs.TxIds[count] = mt.Tx.IDHash copy(txs.Senders.At(count), sender.Bytes()) txs.IsLocal[count] = isLocal toSkip.Add(mt.Tx.IDHash) diff --git a/zk/txpool/pool_zk_limbo.go b/zk/txpool/pool_zk_limbo.go index 2fef5802b4e..f64d9613fcd 100644 --- a/zk/txpool/pool_zk_limbo.go +++ b/zk/txpool/pool_zk_limbo.go @@ -229,6 +229,7 @@ func (p *TxPool) GetLimboTxRplsByHash(tx kv.Tx, txHash *common.Hash) (*types.Txs for i := uint32(0); i < txSize; i++ { limboTx := limboBlock.Transactions[i] txsRlps.Txs[i] = limboTx.Rlp + txsRlps.TxIds[i] = limboTx.Hash copy(txsRlps.Senders.At(int(i)), limboTx.Sender[:]) txsRlps.IsLocal[i] = true // all limbo tx are considered local //TODO: explain better about local }