Skip to content

Commit

Permalink
core/txpool: implement account exclusivity across subpools
Browse files Browse the repository at this point in the history
  • Loading branch information
karalabe committed Jul 6, 2023
1 parent e4c0724 commit 4d28f8e
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 50 deletions.
47 changes: 40 additions & 7 deletions core/txpool/blobpool/blobpool.go
Expand Up @@ -288,7 +288,8 @@ type BlobTxShim struct {
// minimums will need to be done only starting at the swapped in/out nonce
// and leading up to the first no-change.
type BlobPool struct {
config Config // Pool configuration
config Config // Pool configuration
reserve txpool.AddressReserver // Address reserver to ensure exclusivity across subpools

store billy.Database // Persistent data store for the tx metadata and blobs
stored uint64 // Useful data size of all transactions on disk
Expand Down Expand Up @@ -337,7 +338,9 @@ func (p *BlobPool) Filter(tx *types.Transaction) bool {
// Init sets the gas price needed to keep a transaction in the pool and the chain
// head to allow balance / nonce checks. The transaction journal will be loaded
// from disk and filtered based on the provided starting settings.
func (p *BlobPool) Init(gasTip *big.Int, head *types.Header) error {
func (p *BlobPool) Init(gasTip *big.Int, head *types.Header, reserve txpool.AddressReserver) error {
p.reserve = reserve

var (
queuedir = filepath.Join(p.config.Datadir, pendingTransactionStore)
limbodir = filepath.Join(p.config.Datadir, limboedTransactionStore)
Expand Down Expand Up @@ -454,6 +457,9 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error {
return err
}
if _, ok := p.index[sender]; !ok {
if err := p.reserve(sender, true); err != nil {
return err
}
p.index[sender] = []*blobTxMeta{}
p.spent[sender] = new(uint256.Int)
}
Expand Down Expand Up @@ -507,6 +513,8 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
if inclusions != nil { // only during reorgs will the heap will be initialized
heap.Remove(p.evict, p.evict.index[addr])
}
p.reserve(addr, false)

if gapped {
log.Warn("Dropping dangling blob transactions", "from", addr, "missing", next, "drop", nonces, "ids", ids)
} else {
Expand Down Expand Up @@ -631,6 +639,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
if inclusions != nil { // only during reorgs will the heap will be initialized
heap.Remove(p.evict, p.evict.index[addr])
}
p.reserve(addr, false)
} else {
p.index[addr] = txs
}
Expand Down Expand Up @@ -868,6 +877,10 @@ func (p *BlobPool) reinject(addr common.Address, tx *types.Transaction) {
meta := newBlobTxMeta(id, p.store.Size(id), tx)

if _, ok := p.index[addr]; !ok {
if err := p.reserve(addr, true); err != nil {
log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err)
return
}
p.index[addr] = []*blobTxMeta{meta}
p.spent[addr] = meta.costCap
p.evict.Push(addr)
Expand Down Expand Up @@ -920,6 +933,7 @@ func (p *BlobPool) SetGasTip(tip *big.Int) {
} else {
delete(p.index, addr)
delete(p.spent, addr)
p.reserve(addr, false)
}
// Clear out the transactions from the data store
log.Warn("Dropping underpriced blob transaction", "from", addr, "rejected", tx.nonce, "tip", tx.execTipCap, "want", tip, "drop", nonces, "ids", ids)
Expand Down Expand Up @@ -1064,7 +1078,7 @@ func (p *BlobPool) Add(txs []*txpool.Transaction, local bool, sync bool) []error

// Add inserts a new blob transaction into the pool if it passes validation (both
// consensus validity and pool restictions).
func (p *BlobPool) add(tx *types.Transaction, blobs []kzg4844.Blob, commits []kzg4844.Commitment, proofs []kzg4844.Proof) error {
func (p *BlobPool) add(tx *types.Transaction, blobs []kzg4844.Blob, commits []kzg4844.Commitment, proofs []kzg4844.Proof) (err error) {
// The blob pool blocks on adding a transaction. This is because blob txs are
// only even pulled form the network, so this method will act as the overload
// protection for fetches.
Expand All @@ -1082,6 +1096,25 @@ func (p *BlobPool) add(tx *types.Transaction, blobs []kzg4844.Blob, commits []kz
log.Trace("Transaction validation failed", "hash", tx.Hash(), "err", err)
return err
}
// If the address is not yet known, request exclusivity to track the account
// only by this subpool until all transactions are evicted
from, _ := types.Sender(p.signer, tx) // already validated above
if _, ok := p.index[from]; !ok {
if err := p.reserve(from, true); err != nil {
return err
}
defer func() {
// If the transaction is rejected by some post-validation check, remove
// the lock on the reservation set.
//
// Note, `err` here is the named error return, which will be initialized
// by a return statement before running deferred methods. Take care with
// removing or subscoping err as it will break this clause.
if err != nil {
p.reserve(from, false)
}
}()
}
// Transaction permitted into the pool from a nonce and cost perspective,
// insert it into the database and update the indices
blob, err := rlp.EncodeToBytes(&blobTx{Tx: tx, Blobs: blobs, Commits: commits, Proofs: proofs})
Expand All @@ -1096,10 +1129,9 @@ func (p *BlobPool) add(tx *types.Transaction, blobs []kzg4844.Blob, commits []kz
meta := newBlobTxMeta(id, p.store.Size(id), tx)

var (
from, _ = types.Sender(p.signer, tx) // already validated above
next = p.state.GetNonce(from)
offset = int(tx.Nonce() - next)
newacc = false
next = p.state.GetNonce(from)
offset = int(tx.Nonce() - next)
newacc = false
)
if len(p.index[from]) > offset {
// Transaction replaces a previously queued one
Expand Down Expand Up @@ -1217,6 +1249,7 @@ func (p *BlobPool) drop() {
if last {
delete(p.index, from)
delete(p.spent, from)
p.reserve(from, false)
} else {
txs[len(txs)-1] = nil
txs = txs[:len(txs)-1]
Expand Down
38 changes: 33 additions & 5 deletions core/txpool/blobpool/blobpool_test.go
Expand Up @@ -24,6 +24,7 @@ import (
"math/big"
"os"
"path/filepath"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -159,6 +160,33 @@ func (bc *testBlockChain) StateAt(common.Hash) (*state.StateDB, error) {
return bc.statedb, nil
}

// makeAddressReserver is a utility method to sanity check that accounts are
// properly reserved by the blobpool (no duplicate reserves or unreserves).
func makeAddressReserver() txpool.AddressReserver {
var (
reserved = make(map[common.Address]struct{})
lock sync.Mutex
)
return func(addr common.Address, reserve bool) error {
lock.Lock()
defer lock.Unlock()

_, exists := reserved[addr]
if reserve {
if exists {
panic("already reserved")
}
reserved[addr] = struct{}{}
return nil
}
if !exists {
panic("not reserved")
}
delete(reserved, addr)
return nil
}
}

// makeTx is a utility method to construct a random blob transaction and sign it
// with a valid key, only setting the interesting fields from the perspective of
// the blob pool.
Expand Down Expand Up @@ -460,7 +488,7 @@ func TestOpenDrops(t *testing.T) {
statedb: statedb,
}
pool := New(Config{Datadir: storage}, chain)
if err := pool.Init(big.NewInt(1), chain.CurrentBlock()); err != nil {
if err := pool.Init(big.NewInt(1), chain.CurrentBlock(), makeAddressReserver()); err != nil {
t.Fatalf("failed to create blob pool: %v", err)
}
defer pool.Close()
Expand Down Expand Up @@ -569,7 +597,7 @@ func TestOpenIndex(t *testing.T) {
statedb: statedb,
}
pool := New(Config{Datadir: storage}, chain)
if err := pool.Init(big.NewInt(1), chain.CurrentBlock()); err != nil {
if err := pool.Init(big.NewInt(1), chain.CurrentBlock(), makeAddressReserver()); err != nil {
t.Fatalf("failed to create blob pool: %v", err)
}
defer pool.Close()
Expand Down Expand Up @@ -660,7 +688,7 @@ func TestOpenHeap(t *testing.T) {
statedb: statedb,
}
pool := New(Config{Datadir: storage}, chain)
if err := pool.Init(big.NewInt(1), chain.CurrentBlock()); err != nil {
if err := pool.Init(big.NewInt(1), chain.CurrentBlock(), makeAddressReserver()); err != nil {
t.Fatalf("failed to create blob pool: %v", err)
}
defer pool.Close()
Expand Down Expand Up @@ -740,7 +768,7 @@ func TestOpenCap(t *testing.T) {
statedb: statedb,
}
pool := New(Config{Datadir: storage, Datacap: datacap}, chain)
if err := pool.Init(big.NewInt(1), chain.CurrentBlock()); err != nil {
if err := pool.Init(big.NewInt(1), chain.CurrentBlock(), makeAddressReserver()); err != nil {
t.Fatalf("failed to create blob pool: %v", err)
}
// Verify that enough transactions have been dropped to get the pool's size
Expand Down Expand Up @@ -1045,7 +1073,7 @@ func TestAdd(t *testing.T) {
statedb: statedb,
}
pool := New(Config{Datadir: storage}, chain)
if err := pool.Init(big.NewInt(1), chain.CurrentBlock()); err != nil {
if err := pool.Init(big.NewInt(1), chain.CurrentBlock(), makeAddressReserver()); err != nil {
t.Fatalf("test %d: failed to create blob pool: %v", i, err)
}
verifyPoolInternals(t, pool)
Expand Down
70 changes: 62 additions & 8 deletions core/txpool/legacypool/legacypool.go
Expand Up @@ -219,6 +219,7 @@ type LegacyPool struct {
locals *accountSet // Set of local transaction to exempt from eviction rules
journal *journal // Journal of local transaction to back up to disk

reserve txpool.AddressReserver // Address reserver to ensure exclusivity across subpools
pending map[common.Address]*list // All currently processable transactions
queue map[common.Address]*list // Queued but non-processable transactions
beats map[common.Address]time.Time // Last heartbeat from each known account
Expand Down Expand Up @@ -291,7 +292,10 @@ func (pool *LegacyPool) Filter(tx *types.Transaction) bool {
// head to allow balance / nonce checks. The transaction journal will be loaded
// from disk and filtered based on the provided starting settings. The internal
// goroutines will be spun up and the pool deemed operational afterwards.
func (pool *LegacyPool) Init(gasTip *big.Int, head *types.Header) error {
func (pool *LegacyPool) Init(gasTip *big.Int, head *types.Header, reserve txpool.AddressReserver) error {
// Set the address reserver to request exclusive access to pooled accounts
pool.reserve = reserve

// Set the basic pool parameters
pool.gasTip.Store(gasTip)
pool.reset(nil, head)
Expand Down Expand Up @@ -365,7 +369,7 @@ func (pool *LegacyPool) loop() {
if time.Since(pool.beats[addr]) > pool.config.Lifetime {
list := pool.queue[addr].Flatten()
for _, tx := range list {
pool.removeTx(tx.Hash(), true)
pool.removeTx(tx.Hash(), true, true)
}
queuedEvictionMeter.Mark(int64(len(list)))
}
Expand Down Expand Up @@ -428,7 +432,7 @@ func (pool *LegacyPool) SetGasTip(tip *big.Int) {
// pool.priced is sorted by GasFeeCap, so we have to iterate through pool.all instead
drop := pool.all.RemotesBelowTip(tip)
for _, tx := range drop {
pool.removeTx(tx.Hash(), false)
pool.removeTx(tx.Hash(), false, true)
}
pool.priced.Removed(len(drop))
}
Expand Down Expand Up @@ -643,10 +647,31 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
invalidTxMeter.Mark(1)
return false, err
}

// already validated by this point
from, _ := types.Sender(pool.signer, tx)

// If the address is not yet known, request exclusivity to track the account
// only by this subpool until all transactions are evicted
var (
_, hasPending = pool.pending[from]
_, hasQueued = pool.queue[from]
)
if !hasPending && !hasQueued {
if err := pool.reserve(from, true); err != nil {
return false, err
}
defer func() {
// If the transaction is rejected by some post-validation check, remove
// the lock on the reservation set.
//
// Note, `err` here is the named error return, which will be initialized
// by a return statement before running deferred methods. Take care with
// removing or subscoping err as it will break this clause.
if err != nil {
pool.reserve(from, false)
}
}()
}
// If the transaction pool is full, discard underpriced transactions
if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue {
// If the new transaction is underpriced, don't accept it
Expand Down Expand Up @@ -701,7 +726,10 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
for _, tx := range drop {
log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
underpricedTxMeter.Mark(1)
dropped := pool.removeTx(tx.Hash(), false)

sender, _ := types.Sender(pool.signer, tx)
dropped := pool.removeTx(tx.Hash(), false, sender != from) // Don't unreserve the sender of the tx being added if last from the acc

pool.changesSinceReorg += dropped
}
}
Expand Down Expand Up @@ -1025,15 +1053,35 @@ func (pool *LegacyPool) Has(hash common.Hash) bool {

// removeTx removes a single transaction from the queue, moving all subsequent
// transactions back to the future queue.
//
// In unreserve is false, the account will not be relinquished to the main txpool
// even if there are no more references to it. This is used to handle a race when
// a tx being added, and it evicts a previously scheduled tx from the same account,
// which could lead to a premature release of the lock.
//
// Returns the number of transactions removed from the pending queue.
func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool) int {
func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bool) int {
// Fetch the transaction we wish to delete
tx := pool.all.Get(hash)
if tx == nil {
return 0
}
addr, _ := types.Sender(pool.signer, tx) // already validated during insertion

// If after deletion there are no more transactions belonging to this account,
// relinquish the address reservation. It's a bit convoluted do this, via a
// defer, but it's safer vs. the many return pathways.
if unreserve {
defer func() {
var (
_, hasPending = pool.pending[addr]
_, hasQueued = pool.queue[addr]
)
if !hasPending && !hasQueued {
pool.reserve(addr, false)
}
}()
}
// Remove it from the list of known transactions
pool.all.Remove(hash)
if outofbound {
Expand Down Expand Up @@ -1419,6 +1467,9 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
if list.Empty() {
delete(pool.queue, addr)
delete(pool.beats, addr)
if _, ok := pool.pending[addr]; !ok {
pool.reserve(addr, false)
}
}
}
return promoted
Expand Down Expand Up @@ -1540,7 +1591,7 @@ func (pool *LegacyPool) truncateQueue() {
// Drop all transactions if they are less than the overflow
if size := uint64(list.Len()); size <= drop {
for _, tx := range list.Flatten() {
pool.removeTx(tx.Hash(), true)
pool.removeTx(tx.Hash(), true, true)
}
drop -= size
queuedRateLimitMeter.Mark(int64(size))
Expand All @@ -1549,7 +1600,7 @@ func (pool *LegacyPool) truncateQueue() {
// Otherwise drop only last few transactions
txs := list.Flatten()
for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
pool.removeTx(txs[i].Hash(), true)
pool.removeTx(txs[i].Hash(), true, true)
drop--
queuedRateLimitMeter.Mark(1)
}
Expand Down Expand Up @@ -1611,6 +1662,9 @@ func (pool *LegacyPool) demoteUnexecutables() {
// Delete the entire pending entry if it became empty.
if list.Empty() {
delete(pool.pending, addr)
if _, ok := pool.queue[addr]; !ok {
pool.reserve(addr, false)
}
}
}
}
Expand Down

0 comments on commit 4d28f8e

Please sign in to comment.