From 4d28f8eead725ef0bfbde923ddc2f6b23f130509 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 6 Jul 2023 09:29:13 +0300 Subject: [PATCH] core/txpool: implement account exclusivity across subpools --- core/txpool/blobpool/blobpool.go | 47 ++++++++++++--- core/txpool/blobpool/blobpool_test.go | 38 ++++++++++-- core/txpool/legacypool/legacypool.go | 70 +++++++++++++++++++--- core/txpool/legacypool/legacypool2_test.go | 8 +-- core/txpool/legacypool/legacypool_test.go | 64 ++++++++++++++------ core/txpool/subpool.go | 6 +- core/txpool/txpool.go | 58 ++++++++++++++++-- 7 files changed, 241 insertions(+), 50 deletions(-) diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index 8c08a569975e2..c0bce81c77123 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -288,7 +288,8 @@ type BlobTxShim struct { // minimums will need to be done only starting at the swapped in/out nonce // and leading up to the first no-change. type BlobPool struct { - config Config // Pool configuration + config Config // Pool configuration + reserve txpool.AddressReserver // Address reserver to ensure exclusivity across subpools store billy.Database // Persistent data store for the tx metadata and blobs stored uint64 // Useful data size of all transactions on disk @@ -337,7 +338,9 @@ func (p *BlobPool) Filter(tx *types.Transaction) bool { // Init sets the gas price needed to keep a transaction in the pool and the chain // head to allow balance / nonce checks. The transaction journal will be loaded // from disk and filtered based on the provided starting settings. -func (p *BlobPool) Init(gasTip *big.Int, head *types.Header) error { +func (p *BlobPool) Init(gasTip *big.Int, head *types.Header, reserve txpool.AddressReserver) error { + p.reserve = reserve + var ( queuedir = filepath.Join(p.config.Datadir, pendingTransactionStore) limbodir = filepath.Join(p.config.Datadir, limboedTransactionStore) @@ -454,6 +457,9 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error { return err } if _, ok := p.index[sender]; !ok { + if err := p.reserve(sender, true); err != nil { + return err + } p.index[sender] = []*blobTxMeta{} p.spent[sender] = new(uint256.Int) } @@ -507,6 +513,8 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 if inclusions != nil { // only during reorgs will the heap will be initialized heap.Remove(p.evict, p.evict.index[addr]) } + p.reserve(addr, false) + if gapped { log.Warn("Dropping dangling blob transactions", "from", addr, "missing", next, "drop", nonces, "ids", ids) } else { @@ -631,6 +639,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 if inclusions != nil { // only during reorgs will the heap will be initialized heap.Remove(p.evict, p.evict.index[addr]) } + p.reserve(addr, false) } else { p.index[addr] = txs } @@ -868,6 +877,10 @@ func (p *BlobPool) reinject(addr common.Address, tx *types.Transaction) { meta := newBlobTxMeta(id, p.store.Size(id), tx) if _, ok := p.index[addr]; !ok { + if err := p.reserve(addr, true); err != nil { + log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err) + return + } p.index[addr] = []*blobTxMeta{meta} p.spent[addr] = meta.costCap p.evict.Push(addr) @@ -920,6 +933,7 @@ func (p *BlobPool) SetGasTip(tip *big.Int) { } else { delete(p.index, addr) delete(p.spent, addr) + p.reserve(addr, false) } // Clear out the transactions from the data store log.Warn("Dropping underpriced blob transaction", "from", addr, "rejected", tx.nonce, "tip", tx.execTipCap, "want", tip, "drop", nonces, "ids", ids) @@ -1064,7 +1078,7 @@ func (p *BlobPool) Add(txs []*txpool.Transaction, local bool, sync bool) []error // Add inserts a new blob transaction into the pool if it passes validation (both // consensus validity and pool restictions). -func (p *BlobPool) add(tx *types.Transaction, blobs []kzg4844.Blob, commits []kzg4844.Commitment, proofs []kzg4844.Proof) error { +func (p *BlobPool) add(tx *types.Transaction, blobs []kzg4844.Blob, commits []kzg4844.Commitment, proofs []kzg4844.Proof) (err error) { // The blob pool blocks on adding a transaction. This is because blob txs are // only even pulled form the network, so this method will act as the overload // protection for fetches. @@ -1082,6 +1096,25 @@ func (p *BlobPool) add(tx *types.Transaction, blobs []kzg4844.Blob, commits []kz log.Trace("Transaction validation failed", "hash", tx.Hash(), "err", err) return err } + // If the address is not yet known, request exclusivity to track the account + // only by this subpool until all transactions are evicted + from, _ := types.Sender(p.signer, tx) // already validated above + if _, ok := p.index[from]; !ok { + if err := p.reserve(from, true); err != nil { + return err + } + defer func() { + // If the transaction is rejected by some post-validation check, remove + // the lock on the reservation set. + // + // Note, `err` here is the named error return, which will be initialized + // by a return statement before running deferred methods. Take care with + // removing or subscoping err as it will break this clause. + if err != nil { + p.reserve(from, false) + } + }() + } // Transaction permitted into the pool from a nonce and cost perspective, // insert it into the database and update the indices blob, err := rlp.EncodeToBytes(&blobTx{Tx: tx, Blobs: blobs, Commits: commits, Proofs: proofs}) @@ -1096,10 +1129,9 @@ func (p *BlobPool) add(tx *types.Transaction, blobs []kzg4844.Blob, commits []kz meta := newBlobTxMeta(id, p.store.Size(id), tx) var ( - from, _ = types.Sender(p.signer, tx) // already validated above - next = p.state.GetNonce(from) - offset = int(tx.Nonce() - next) - newacc = false + next = p.state.GetNonce(from) + offset = int(tx.Nonce() - next) + newacc = false ) if len(p.index[from]) > offset { // Transaction replaces a previously queued one @@ -1217,6 +1249,7 @@ func (p *BlobPool) drop() { if last { delete(p.index, from) delete(p.spent, from) + p.reserve(from, false) } else { txs[len(txs)-1] = nil txs = txs[:len(txs)-1] diff --git a/core/txpool/blobpool/blobpool_test.go b/core/txpool/blobpool/blobpool_test.go index ce973766c0ce0..0d07b82bb750a 100644 --- a/core/txpool/blobpool/blobpool_test.go +++ b/core/txpool/blobpool/blobpool_test.go @@ -24,6 +24,7 @@ import ( "math/big" "os" "path/filepath" + "sync" "testing" "time" @@ -159,6 +160,33 @@ func (bc *testBlockChain) StateAt(common.Hash) (*state.StateDB, error) { return bc.statedb, nil } +// makeAddressReserver is a utility method to sanity check that accounts are +// properly reserved by the blobpool (no duplicate reserves or unreserves). +func makeAddressReserver() txpool.AddressReserver { + var ( + reserved = make(map[common.Address]struct{}) + lock sync.Mutex + ) + return func(addr common.Address, reserve bool) error { + lock.Lock() + defer lock.Unlock() + + _, exists := reserved[addr] + if reserve { + if exists { + panic("already reserved") + } + reserved[addr] = struct{}{} + return nil + } + if !exists { + panic("not reserved") + } + delete(reserved, addr) + return nil + } +} + // makeTx is a utility method to construct a random blob transaction and sign it // with a valid key, only setting the interesting fields from the perspective of // the blob pool. @@ -460,7 +488,7 @@ func TestOpenDrops(t *testing.T) { statedb: statedb, } pool := New(Config{Datadir: storage}, chain) - if err := pool.Init(big.NewInt(1), chain.CurrentBlock()); err != nil { + if err := pool.Init(big.NewInt(1), chain.CurrentBlock(), makeAddressReserver()); err != nil { t.Fatalf("failed to create blob pool: %v", err) } defer pool.Close() @@ -569,7 +597,7 @@ func TestOpenIndex(t *testing.T) { statedb: statedb, } pool := New(Config{Datadir: storage}, chain) - if err := pool.Init(big.NewInt(1), chain.CurrentBlock()); err != nil { + if err := pool.Init(big.NewInt(1), chain.CurrentBlock(), makeAddressReserver()); err != nil { t.Fatalf("failed to create blob pool: %v", err) } defer pool.Close() @@ -660,7 +688,7 @@ func TestOpenHeap(t *testing.T) { statedb: statedb, } pool := New(Config{Datadir: storage}, chain) - if err := pool.Init(big.NewInt(1), chain.CurrentBlock()); err != nil { + if err := pool.Init(big.NewInt(1), chain.CurrentBlock(), makeAddressReserver()); err != nil { t.Fatalf("failed to create blob pool: %v", err) } defer pool.Close() @@ -740,7 +768,7 @@ func TestOpenCap(t *testing.T) { statedb: statedb, } pool := New(Config{Datadir: storage, Datacap: datacap}, chain) - if err := pool.Init(big.NewInt(1), chain.CurrentBlock()); err != nil { + if err := pool.Init(big.NewInt(1), chain.CurrentBlock(), makeAddressReserver()); err != nil { t.Fatalf("failed to create blob pool: %v", err) } // Verify that enough transactions have been dropped to get the pool's size @@ -1045,7 +1073,7 @@ func TestAdd(t *testing.T) { statedb: statedb, } pool := New(Config{Datadir: storage}, chain) - if err := pool.Init(big.NewInt(1), chain.CurrentBlock()); err != nil { + if err := pool.Init(big.NewInt(1), chain.CurrentBlock(), makeAddressReserver()); err != nil { t.Fatalf("test %d: failed to create blob pool: %v", i, err) } verifyPoolInternals(t, pool) diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 95e1b526d8bea..5569ac135f8e9 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -219,6 +219,7 @@ type LegacyPool struct { locals *accountSet // Set of local transaction to exempt from eviction rules journal *journal // Journal of local transaction to back up to disk + reserve txpool.AddressReserver // Address reserver to ensure exclusivity across subpools pending map[common.Address]*list // All currently processable transactions queue map[common.Address]*list // Queued but non-processable transactions beats map[common.Address]time.Time // Last heartbeat from each known account @@ -291,7 +292,10 @@ func (pool *LegacyPool) Filter(tx *types.Transaction) bool { // head to allow balance / nonce checks. The transaction journal will be loaded // from disk and filtered based on the provided starting settings. The internal // goroutines will be spun up and the pool deemed operational afterwards. -func (pool *LegacyPool) Init(gasTip *big.Int, head *types.Header) error { +func (pool *LegacyPool) Init(gasTip *big.Int, head *types.Header, reserve txpool.AddressReserver) error { + // Set the address reserver to request exclusive access to pooled accounts + pool.reserve = reserve + // Set the basic pool parameters pool.gasTip.Store(gasTip) pool.reset(nil, head) @@ -365,7 +369,7 @@ func (pool *LegacyPool) loop() { if time.Since(pool.beats[addr]) > pool.config.Lifetime { list := pool.queue[addr].Flatten() for _, tx := range list { - pool.removeTx(tx.Hash(), true) + pool.removeTx(tx.Hash(), true, true) } queuedEvictionMeter.Mark(int64(len(list))) } @@ -428,7 +432,7 @@ func (pool *LegacyPool) SetGasTip(tip *big.Int) { // pool.priced is sorted by GasFeeCap, so we have to iterate through pool.all instead drop := pool.all.RemotesBelowTip(tip) for _, tx := range drop { - pool.removeTx(tx.Hash(), false) + pool.removeTx(tx.Hash(), false, true) } pool.priced.Removed(len(drop)) } @@ -643,10 +647,31 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e invalidTxMeter.Mark(1) return false, err } - // already validated by this point from, _ := types.Sender(pool.signer, tx) + // If the address is not yet known, request exclusivity to track the account + // only by this subpool until all transactions are evicted + var ( + _, hasPending = pool.pending[from] + _, hasQueued = pool.queue[from] + ) + if !hasPending && !hasQueued { + if err := pool.reserve(from, true); err != nil { + return false, err + } + defer func() { + // If the transaction is rejected by some post-validation check, remove + // the lock on the reservation set. + // + // Note, `err` here is the named error return, which will be initialized + // by a return statement before running deferred methods. Take care with + // removing or subscoping err as it will break this clause. + if err != nil { + pool.reserve(from, false) + } + }() + } // If the transaction pool is full, discard underpriced transactions if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue { // If the new transaction is underpriced, don't accept it @@ -701,7 +726,10 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e for _, tx := range drop { log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap()) underpricedTxMeter.Mark(1) - dropped := pool.removeTx(tx.Hash(), false) + + sender, _ := types.Sender(pool.signer, tx) + dropped := pool.removeTx(tx.Hash(), false, sender != from) // Don't unreserve the sender of the tx being added if last from the acc + pool.changesSinceReorg += dropped } } @@ -1025,8 +1053,14 @@ func (pool *LegacyPool) Has(hash common.Hash) bool { // removeTx removes a single transaction from the queue, moving all subsequent // transactions back to the future queue. +// +// In unreserve is false, the account will not be relinquished to the main txpool +// even if there are no more references to it. This is used to handle a race when +// a tx being added, and it evicts a previously scheduled tx from the same account, +// which could lead to a premature release of the lock. +// // Returns the number of transactions removed from the pending queue. -func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool) int { +func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bool) int { // Fetch the transaction we wish to delete tx := pool.all.Get(hash) if tx == nil { @@ -1034,6 +1068,20 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool) int { } addr, _ := types.Sender(pool.signer, tx) // already validated during insertion + // If after deletion there are no more transactions belonging to this account, + // relinquish the address reservation. It's a bit convoluted do this, via a + // defer, but it's safer vs. the many return pathways. + if unreserve { + defer func() { + var ( + _, hasPending = pool.pending[addr] + _, hasQueued = pool.queue[addr] + ) + if !hasPending && !hasQueued { + pool.reserve(addr, false) + } + }() + } // Remove it from the list of known transactions pool.all.Remove(hash) if outofbound { @@ -1419,6 +1467,9 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T if list.Empty() { delete(pool.queue, addr) delete(pool.beats, addr) + if _, ok := pool.pending[addr]; !ok { + pool.reserve(addr, false) + } } } return promoted @@ -1540,7 +1591,7 @@ func (pool *LegacyPool) truncateQueue() { // Drop all transactions if they are less than the overflow if size := uint64(list.Len()); size <= drop { for _, tx := range list.Flatten() { - pool.removeTx(tx.Hash(), true) + pool.removeTx(tx.Hash(), true, true) } drop -= size queuedRateLimitMeter.Mark(int64(size)) @@ -1549,7 +1600,7 @@ func (pool *LegacyPool) truncateQueue() { // Otherwise drop only last few transactions txs := list.Flatten() for i := len(txs) - 1; i >= 0 && drop > 0; i-- { - pool.removeTx(txs[i].Hash(), true) + pool.removeTx(txs[i].Hash(), true, true) drop-- queuedRateLimitMeter.Mark(1) } @@ -1611,6 +1662,9 @@ func (pool *LegacyPool) demoteUnexecutables() { // Delete the entire pending entry if it became empty. if list.Empty() { delete(pool.pending, addr) + if _, ok := pool.queue[addr]; !ok { + pool.reserve(addr, false) + } } } } diff --git a/core/txpool/legacypool/legacypool2_test.go b/core/txpool/legacypool/legacypool2_test.go index 5de34588afe2c..a73c1bb8a7724 100644 --- a/core/txpool/legacypool/legacypool2_test.go +++ b/core/txpool/legacypool/legacypool2_test.go @@ -84,7 +84,7 @@ func TestTransactionFutureAttack(t *testing.T) { config.GlobalQueue = 100 config.GlobalSlots = 100 pool := New(config, blockchain) - pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() fillPool(t, pool) pending, _ := pool.Stats() @@ -118,7 +118,7 @@ func TestTransactionFuture1559(t *testing.T) { statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() // Create a number of test accounts, fund them and make transactions @@ -151,7 +151,7 @@ func TestTransactionZAttack(t *testing.T) { statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() // Create a number of test accounts, fund them and make transactions fillPool(t, pool) @@ -222,7 +222,7 @@ func BenchmarkFutureAttack(b *testing.B) { config.GlobalQueue = 100 config.GlobalSlots = 100 pool := New(config, blockchain) - pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() fillPool(b, pool) diff --git a/core/txpool/legacypool/legacypool_test.go b/core/txpool/legacypool/legacypool_test.go index e3302487b3b05..5ba78b5339480 100644 --- a/core/txpool/legacypool/legacypool_test.go +++ b/core/txpool/legacypool/legacypool_test.go @@ -24,6 +24,7 @@ import ( "math/big" "math/rand" "os" + "sync" "sync/atomic" "testing" "time" @@ -127,6 +128,31 @@ func dynamicFeeTx(nonce uint64, gaslimit uint64, gasFee *big.Int, tip *big.Int, return tx } +func makeAddressReserver() txpool.AddressReserver { + var ( + reserved = make(map[common.Address]struct{}) + lock sync.Mutex + ) + return func(addr common.Address, reserve bool) error { + lock.Lock() + defer lock.Unlock() + + _, exists := reserved[addr] + if reserve { + if exists { + panic("already reserved") + } + reserved[addr] = struct{}{} + return nil + } + if !exists { + panic("not reserved") + } + delete(reserved, addr) + return nil + } +} + func setupPool() (*LegacyPool, *ecdsa.PrivateKey) { return setupPoolWithConfig(params.TestChainConfig) } @@ -137,7 +163,7 @@ func setupPoolWithConfig(config *params.ChainConfig) (*LegacyPool, *ecdsa.Privat key, _ := crypto.GenerateKey() pool := New(testTxPoolConfig, blockchain) - if err := pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()); err != nil { + if err := pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()); err != nil { panic(err) } // wait for the pool to initialize @@ -256,7 +282,7 @@ func TestStateChangeDuringReset(t *testing.T) { tx1 := transaction(1, 100000, key) pool := New(testTxPoolConfig, blockchain) - pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() nonce := pool.Nonce(address) @@ -455,7 +481,7 @@ func TestChainFork(t *testing.T) { if _, err := pool.add(tx, false); err != nil { t.Error("didn't expect error", err) } - pool.removeTx(tx.Hash(), true) + pool.removeTx(tx.Hash(), true, true) // reset the pool's internal state resetState() @@ -676,7 +702,7 @@ func TestPostponing(t *testing.T) { blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() // Create two test accounts to produce different gap profiles with @@ -893,7 +919,7 @@ func testQueueGlobalLimiting(t *testing.T, nolocals bool) { config.GlobalQueue = config.AccountQueue*3 - 1 // reduce the queue limits to shorten test time (-1 to make it non divisible) pool := New(config, blockchain) - pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() // Create a number of test accounts and fund them (last one will be the local) @@ -986,7 +1012,7 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) { config.NoLocals = nolocals pool := New(config, blockchain) - pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() // Create two test accounts to ensure remotes expire but locals do not @@ -1171,7 +1197,7 @@ func TestPendingGlobalLimiting(t *testing.T) { config.GlobalSlots = config.AccountSlots * 10 pool := New(config, blockchain) - pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() // Create a number of test accounts and fund them @@ -1275,7 +1301,7 @@ func TestCapClearsFromAll(t *testing.T) { config.GlobalSlots = 8 pool := New(config, blockchain) - pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() // Create a number of test accounts and fund them @@ -1308,7 +1334,7 @@ func TestPendingMinimumAllowance(t *testing.T) { config.GlobalSlots = 1 pool := New(config, blockchain) - pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() // Create a number of test accounts and fund them @@ -1354,7 +1380,7 @@ func TestRepricing(t *testing.T) { blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() // Keep track of transaction events to ensure all executables get announced @@ -1603,7 +1629,7 @@ func TestRepricingKeepsLocals(t *testing.T) { blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() // Create a number of test accounts and fund them @@ -1681,7 +1707,7 @@ func TestUnderpricing(t *testing.T) { config.GlobalQueue = 2 pool := New(config, blockchain) - pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() // Keep track of transaction events to ensure all executables get announced @@ -1796,7 +1822,7 @@ func TestStableUnderpricing(t *testing.T) { config.GlobalQueue = 0 pool := New(config, blockchain) - pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() // Keep track of transaction events to ensure all executables get announced @@ -2025,7 +2051,7 @@ func TestDeduplication(t *testing.T) { blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() // Create a test account to add transactions with @@ -2092,7 +2118,7 @@ func TestReplacement(t *testing.T) { blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() // Keep track of transaction events to ensure all executables get announced @@ -2303,7 +2329,7 @@ func testJournaling(t *testing.T, nolocals bool) { config.Rejournal = time.Second pool := New(config, blockchain) - pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) // Create two test accounts to ensure remotes expire but locals do not local, _ := crypto.GenerateKey() @@ -2341,7 +2367,7 @@ func testJournaling(t *testing.T, nolocals bool) { blockchain = newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) pool = New(config, blockchain) - pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) pending, queued = pool.Stats() if queued != 0 { @@ -2368,7 +2394,7 @@ func testJournaling(t *testing.T, nolocals bool) { statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) blockchain = newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) pool = New(config, blockchain) - pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(config.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) pending, queued = pool.Stats() if pending != 0 { @@ -2399,7 +2425,7 @@ func TestStatusCheck(t *testing.T) { blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock()) + pool.Init(new(big.Int).SetUint64(testTxPoolConfig.PriceLimit), blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() // Create the test accounts to check various transaction statuses with diff --git a/core/txpool/subpool.go b/core/txpool/subpool.go index 576e9e89ceeb2..70c0918e140c2 100644 --- a/core/txpool/subpool.go +++ b/core/txpool/subpool.go @@ -59,6 +59,10 @@ func (ltx *LazyTransaction) Resolve() *Transaction { return ltx.Tx } +// AddressReserver is passed by the main transaction pool to subpools, so they +// may request (and relinquish) exclusive access to certain addresses. +type AddressReserver func(addr common.Address, reserve bool) error + // SubPool represents a specialized transaction pool that lives on its own (e.g. // blob pool). Since independent of how many specialized pools we have, they do // need to be updated in lockstep and assemble into one coherent view for block @@ -76,7 +80,7 @@ type SubPool interface { // These should not be passed as a constructor argument - nor should the pools // start by themselves - in order to keep multiple subpools in lockstep with // one another. - Init(gasTip *big.Int, head *types.Header) error + Init(gasTip *big.Int, head *types.Header, reserve AddressReserver) error // Close terminates any background processing threads and releases any held // resources. diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 35a809517af7e..2c9dadd0452b8 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -17,13 +17,16 @@ package txpool import ( + "errors" "fmt" "math/big" + "sync" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" ) // TxStatus is the current status of a transaction as seen by the pool. @@ -52,9 +55,13 @@ type BlockChain interface { // They exit the pool when they are included in the blockchain or evicted due to // resource constraints. type TxPool struct { - subpools []SubPool // List of subpools for specialized transaction handling - subs event.SubscriptionScope // Subscription scope to unscubscribe all on shutdown - quit chan chan error // Quit channel to tear down the head updater + subpools []SubPool // List of subpools for specialized transaction handling + + reservations map[common.Address]SubPool // Map with the account to pool reservations + reserveLock sync.Mutex // Lock protecting the account reservations + + subs event.SubscriptionScope // Subscription scope to unscubscribe all on shutdown + quit chan chan error // Quit channel to tear down the head updater } // New creates a new transaction pool to gather, sort and filter inbound @@ -66,11 +73,12 @@ func New(gasTip *big.Int, chain BlockChain, subpools []SubPool) (*TxPool, error) head := chain.CurrentBlock() pool := &TxPool{ - subpools: subpools, - quit: make(chan chan error), + subpools: subpools, + reservations: make(map[common.Address]SubPool), + quit: make(chan chan error), } for i, subpool := range subpools { - if err := subpool.Init(gasTip, head); err != nil { + if err := subpool.Init(gasTip, head, pool.reserver(subpool)); err != nil { for j := i - 1; j >= 0; j-- { subpools[j].Close() } @@ -81,6 +89,44 @@ func New(gasTip *big.Int, chain BlockChain, subpools []SubPool) (*TxPool, error) return pool, nil } +// reserver is a method to create an address reservation callback to exclusively +// assign/deassign addresses to/from subpools. This can ensure that at any point +// in time, only a single subpool is able to manage an account, avoiding cross +// subpool eviction issues and nonce conflicts. +func (p *TxPool) reserver(subpool SubPool) AddressReserver { + return func(addr common.Address, reserve bool) error { + p.reserveLock.Lock() + defer p.reserveLock.Unlock() + + owner, exists := p.reservations[addr] + if reserve { + // Double reservations are forbidden even from the same pool to + // avoid subtle bugs in the long term. + if exists { + if owner == subpool { + log.Error("pool attempted to reserve already-owned address", "address", addr) + return nil // Ignore fault to give the pool a chance to recover while the bug gets fixed + } + return errors.New("address already reserved") + } + p.reservations[addr] = subpool + return nil + } + // Ensure subpools only attempt to unreserve their own owned addresses, + // otherwise flag as a programming error. + if !exists { + log.Error("pool attempted to unreserve non-reserved address", "address", addr) + return errors.New("address not reserved") + } + if subpool != owner { + log.Error("pool attempted to unreserve non-owned address", "address", addr) + return errors.New("address not owned") + } + delete(p.reservations, addr) + return nil + } +} + // Close terminates the transaction pool and all its subpools. func (p *TxPool) Close() error { var errs []error