Skip to content

Commit

Permalink
core/txpool: abstraction prep work for secondary pools (blob pool)
Browse files Browse the repository at this point in the history
  • Loading branch information
karalabe committed Jun 6, 2023
1 parent c537ace commit f1b9fff
Show file tree
Hide file tree
Showing 11 changed files with 127 additions and 179 deletions.
2 changes: 1 addition & 1 deletion cmd/geth/main.go
Expand Up @@ -419,7 +419,7 @@ func startNode(ctx *cli.Context, stack *node.Node, backend ethapi.Backend, isCon
}
// Set the gas price to the limits from the CLI and start mining
gasprice := flags.GlobalBig(ctx, utils.MinerGasPriceFlag.Name)
ethBackend.TxPool().SetGasPrice(gasprice)
ethBackend.TxPool().SetGasTip(gasprice)
if err := ethBackend.StartMining(); err != nil {
utils.Fatalf("Failed to start mining: %v", err)
}
Expand Down
201 changes: 75 additions & 126 deletions core/txpool/txpool.go
Expand Up @@ -18,7 +18,6 @@ package txpool

import (
"errors"
"fmt"
"math"
"math/big"
"sort"
Expand Down Expand Up @@ -91,10 +90,6 @@ var (
// ErrFutureReplacePending is returned if a future transaction replaces a pending
// transaction. Future transactions should only be able to replace other future transactions.
ErrFutureReplacePending = errors.New("future transaction tries to replace pending")

// ErrOverdraft is returned if a transaction would cause the senders balance to go negative
// thus invalidating a potential large number of transactions.
ErrOverdraft = errors.New("transaction would cause overdraft")
)

var (
Expand Down Expand Up @@ -178,8 +173,7 @@ type Config struct {
Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
}

// DefaultConfig contains the default configurations for the transaction
// pool.
// DefaultConfig contains the default configurations for the transaction pool.
var DefaultConfig = Config{
Journal: "transactions.rlp",
Rejournal: time.Hour,
Expand Down Expand Up @@ -245,20 +239,15 @@ type TxPool struct {
config Config
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
gasTip atomic.Pointer[big.Int]
txFeed event.Feed
scope event.SubscriptionScope
signer types.Signer
mu sync.RWMutex

istanbul atomic.Bool // Fork indicator whether we are in the istanbul stage.
eip2718 atomic.Bool // Fork indicator whether we are using EIP-2718 type transactions.
eip1559 atomic.Bool // Fork indicator whether we are using EIP-1559 type transactions.
shanghai atomic.Bool // Fork indicator whether we are in the Shanghai stage.

currentState *state.StateDB // Current state in the blockchain head
pendingNonces *noncer // Pending state tracking virtual nonces
currentMaxGas atomic.Uint64 // Current gas limit for transaction caps
currentHead atomic.Pointer[types.Header] // Current head of the blockchain
currentState *state.StateDB // Current state in the blockchain head
pendingNonces *noncer // Pending state tracking virtual nonces

locals *accountSet // Set of local transaction to exempt from eviction rules
journal *journal // Journal of local transaction to back up to disk
Expand All @@ -280,15 +269,17 @@ type TxPool struct {
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)

changesSinceReorg int // A counter for how many drops we've performed in-between reorg.

subpools []SubPool // List of subpools for specialized transaction handling
}

type txpoolResetRequest struct {
oldHead, newHead *types.Header
}

// NewTxPool creates a new transaction pool to gather, sort and filter inbound
// New creates a new transaction pool to gather, sort and filter inbound
// transactions from the network.
func NewTxPool(config Config, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
func New(config Config, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
// Sanitize the input to ensure no vulnerable gas prices are set
config = (&config).sanitize()

Expand All @@ -309,8 +300,8 @@ func NewTxPool(config Config, chainconfig *params.ChainConfig, chain blockChain)
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
initDoneCh: make(chan struct{}),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
}
pool.gasTip.Store(new(big.Int).SetUint64(config.PriceLimit))
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
log.Info("Setting new local account", "address", addr)
Expand Down Expand Up @@ -343,6 +334,16 @@ func NewTxPool(config Config, chainconfig *params.ChainConfig, chain blockChain)
return pool
}

// AddSubPool injects a specialized pool into the main transaction pool to have
// a consistent view of the chain state across both of them.
func (pool *TxPool) AddSubPool(subpool SubPool) {
pool.mu.Lock()
defer pool.mu.Unlock()

pool.subpools = append(pool.subpools, subpool)
subpool.Reset(nil, pool.currentHead.Load())
}

// loop is the transaction pool's main event loop, waiting for and reacting to
// outside blockchain events as well as for various reporting and transaction
// eviction events.
Expand Down Expand Up @@ -443,33 +444,29 @@ func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subsc
return pool.scope.Track(pool.txFeed.Subscribe(ch))
}

// GasPrice returns the current gas price enforced by the transaction pool.
func (pool *TxPool) GasPrice() *big.Int {
pool.mu.RLock()
defer pool.mu.RUnlock()

return new(big.Int).Set(pool.gasPrice)
}

// SetGasPrice updates the minimum price required by the transaction pool for a
// SetGasTip updates the minimum gas tip required by the transaction pool for a
// new transaction, and drops all transactions below this threshold.
func (pool *TxPool) SetGasPrice(price *big.Int) {
func (pool *TxPool) SetGasTip(tip *big.Int) {
pool.mu.Lock()
defer pool.mu.Unlock()

old := pool.gasPrice
pool.gasPrice = price
// if the min miner fee increased, remove transactions below the new threshold
if price.Cmp(old) > 0 {
old := pool.gasTip.Load()
pool.gasTip.Store(new(big.Int).Set(tip))

// If the min miner fee increased, remove transactions below the new threshold
if tip.Cmp(old) > 0 {
// pool.priced is sorted by GasFeeCap, so we have to iterate through pool.all instead
drop := pool.all.RemotesBelowTip(price)
drop := pool.all.RemotesBelowTip(tip)
for _, tx := range drop {
pool.removeTx(tx.Hash(), false)
}
pool.priced.Removed(len(drop))
}

log.Info("Transaction pool price threshold updated", "price", price)
// Propagate the new gas tip requirement to all the subpools too
for _, subpool := range pool.subpools {
subpool.SetGasTip(tip)
}
log.Info("Transaction pool tip threshold updated", "tip", tip)
}

// Nonce returns the next nonce of an account, with all transactions executable
Expand Down Expand Up @@ -556,7 +553,7 @@ func (pool *TxPool) Pending(enforceTips bool) map[common.Address]types.Transacti
// If the miner requests tip enforcement, cap the lists now
if enforceTips && !pool.locals.contains(addr) {
for i, tx := range txs {
if tx.EffectiveGasTipIntCmp(pool.gasPrice, pool.priced.urgent.baseFee) < 0 {
if tx.EffectiveGasTipIntCmp(pool.gasTip.Load(), pool.priced.urgent.baseFee) < 0 {
txs = txs[:i]
break
}
Expand Down Expand Up @@ -598,93 +595,49 @@ func (pool *TxPool) local() map[common.Address]types.Transactions {
// This check is meant as an early check which only needs to be performed once,
// and does not require the pool mutex to be held.
func (pool *TxPool) validateTxBasics(tx *types.Transaction, local bool) error {
// Accept only legacy transactions until EIP-2718/2930 activates.
if !pool.eip2718.Load() && tx.Type() != types.LegacyTxType {
return core.ErrTxTypeNotSupported
}
// Reject dynamic fee transactions until EIP-1559 activates.
if !pool.eip1559.Load() && tx.Type() == types.DynamicFeeTxType {
return core.ErrTxTypeNotSupported
}
// Reject blob transactions forever, those will have their own pool.
if tx.Type() == types.BlobTxType {
return core.ErrTxTypeNotSupported
}
// Reject transactions over defined size to prevent DOS attacks
if tx.Size() > txMaxSize {
return ErrOversizedData
}
// Check whether the init code size has been exceeded.
if pool.shanghai.Load() && tx.To() == nil && len(tx.Data()) > params.MaxInitCodeSize {
return fmt.Errorf("%w: code size %v limit %v", core.ErrMaxInitCodeSizeExceeded, len(tx.Data()), params.MaxInitCodeSize)
}
// Transactions can't be negative. This may never happen using RLP decoded
// transactions but may occur if you create a transaction using the RPC.
if tx.Value().Sign() < 0 {
return ErrNegativeValue
opts := &ValidationOptions{
Config: pool.chainconfig,
Accept: map[uint8]struct{}{
types.LegacyTxType: {},
types.AccessListTxType: {},
types.DynamicFeeTxType: {},
},
MaxSize: txMaxSize,
MinTip: pool.gasTip.Load(),
}
// Ensure the transaction doesn't exceed the current block limit gas.
if pool.currentMaxGas.Load() < tx.Gas() {
return ErrGasLimit
}
// Sanity check for extremely large numbers
if tx.GasFeeCap().BitLen() > 256 {
return core.ErrFeeCapVeryHigh
}
if tx.GasTipCap().BitLen() > 256 {
return core.ErrTipVeryHigh
}
// Ensure gasFeeCap is greater than or equal to gasTipCap.
if tx.GasFeeCapIntCmp(tx.GasTipCap()) < 0 {
return core.ErrTipAboveFeeCap
}
// Make sure the transaction is signed properly.
if _, err := types.Sender(pool.signer, tx); err != nil {
return ErrInvalidSender
}
// Drop non-local transactions under our own minimal accepted gas price or tip
if !local && tx.GasTipCapIntCmp(pool.gasPrice) < 0 {
return ErrUnderpriced
if local {
opts.MinTip = new(big.Int)
}
// Ensure the transaction has more gas than the basic tx fee.
intrGas, err := core.IntrinsicGas(tx.Data(), tx.AccessList(), tx.To() == nil, true, pool.istanbul.Load(), pool.shanghai.Load())
if err != nil {
if err := ValidateTransaction(tx, nil, nil, nil, pool.currentHead.Load(), pool.signer, opts); err != nil {
return err
}
if tx.Gas() < intrGas {
return core.ErrIntrinsicGas
}
return nil
}

// validateTx checks whether a transaction is valid according to the consensus
// rules and adheres to some heuristic limits of the local node (price and size).
func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
// Signature has been checked already, this cannot error.
from, _ := types.Sender(pool.signer, tx)
// Ensure the transaction adheres to nonce ordering
if pool.currentState.GetNonce(from) > tx.Nonce() {
return core.ErrNonceTooLow
}
// Transactor should have enough funds to cover the costs
// cost == V + GP * GL
balance := pool.currentState.GetBalance(from)
if balance.Cmp(tx.Cost()) < 0 {
return core.ErrInsufficientFunds
}

// Verify that replacing transactions will not result in overdraft
list := pool.pending[from]
if list != nil { // Sender already has pending txs
sum := new(big.Int).Add(tx.Cost(), list.totalcost)
if repl := list.txs.Get(tx.Nonce()); repl != nil {
// Deduct the cost of a transaction replaced by this
sum.Sub(sum, repl.Cost())
}
if balance.Cmp(sum) < 0 {
log.Trace("Replacing transactions would overdraft", "sender", from, "balance", pool.currentState.GetBalance(from), "required", sum)
return ErrOverdraft
}
opts := &ValidationOptionsWithState{
State: pool.currentState,

FirstNonceGap: nil, // Pool allows arbitrary arrival order, don't invalidate nonce gaps
ExistingExpenditure: func(addr common.Address) *big.Int {
if list := pool.pending[addr]; list != nil {
return list.totalcost
}
return new(big.Int)
},
ExistingCost: func(addr common.Address, nonce uint64) *big.Int {
if list := pool.pending[addr]; list != nil {
if tx := list.txs.Get(nonce); tx != nil {
return tx.Cost()
}
}
return nil
},
}
if err := ValidateTransactionWithState(tx, pool.signer, opts); err != nil {
return err
}
return nil
}
Expand Down Expand Up @@ -993,7 +946,6 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
// Exclude transactions with basic errors, e.g invalid signatures and
// insufficient intrinsic gas as soon as possible and cache senders
// in transactions before obtaining lock

if err := pool.validateTxBasics(tx, local); err != nil {
errs[i] = err
invalidTxMeter.Mark(1)
Expand Down Expand Up @@ -1249,7 +1201,9 @@ func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirt
if reset != nil {
// Reset from the old head to the new, rescheduling any reorged transactions
pool.reset(reset.oldHead, reset.newHead)

for _, subpool := range pool.subpools {
subpool.Reset(reset.oldHead, reset.newHead)
}
// Nonces were reset, discard any events that became stale
for addr := range events {
events[addr].Forward(pool.pendingNonces.get(addr))
Expand Down Expand Up @@ -1383,21 +1337,14 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) {
log.Error("Failed to reset txpool state", "err", err)
return
}
pool.currentHead.Store(newHead)
pool.currentState = statedb
pool.pendingNonces = newNoncer(statedb)
pool.currentMaxGas.Store(newHead.GasLimit)

// Inject any transactions discarded due to reorgs
log.Debug("Reinjecting stale transactions", "count", len(reinject))
core.SenderCacher.Recover(pool.signer, reinject)
pool.addTxsLocked(reinject, false)

// Update all fork indicator by next pending block number.
next := new(big.Int).Add(newHead.Number, big.NewInt(1))
pool.istanbul.Store(pool.chainconfig.IsIstanbul(next))
pool.eip2718.Store(pool.chainconfig.IsBerlin(next))
pool.eip1559.Store(pool.chainconfig.IsLondon(next))
pool.shanghai.Store(pool.chainconfig.IsShanghai(next, uint64(time.Now().Unix())))
}

// promoteExecutables moves transactions that have become processable from the
Expand All @@ -1408,6 +1355,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
var promoted []*types.Transaction

// Iterate over all accounts and promote any executable transactions
gasLimit := pool.currentHead.Load().GasLimit
for _, addr := range accounts {
list := pool.queue[addr]
if list == nil {
Expand All @@ -1421,7 +1369,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans
}
log.Trace("Removed old queued transactions", "count", len(forwards))
// Drop all transactions that are too costly (low balance or out of gas)
drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas.Load())
drops, _ := list.Filter(pool.currentState.GetBalance(addr), gasLimit)
for _, tx := range drops {
hash := tx.Hash()
pool.all.Remove(hash)
Expand Down Expand Up @@ -1607,6 +1555,7 @@ func (pool *TxPool) truncateQueue() {
// to trigger a re-heap is this function
func (pool *TxPool) demoteUnexecutables() {
// Iterate over all accounts and demote any non-executable transactions
gasLimit := pool.currentHead.Load().GasLimit
for addr, list := range pool.pending {
nonce := pool.currentState.GetNonce(addr)

Expand All @@ -1618,7 +1567,7 @@ func (pool *TxPool) demoteUnexecutables() {
log.Trace("Removed old pending transaction", "hash", hash)
}
// Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later
drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas.Load())
drops, invalids := list.Filter(pool.currentState.GetBalance(addr), gasLimit)
for _, tx := range drops {
hash := tx.Hash()
log.Trace("Removed unpayable pending transaction", "hash", hash)
Expand Down

0 comments on commit f1b9fff

Please sign in to comment.