Skip to content

Commit

Permalink
core, eth, miner: start propagating and consuming blob txs (#28243)
Browse files Browse the repository at this point in the history
* core, eth, miner: start propagating and consuming blob txs

* eth/protocols/eth: disable eth/67 if Cancun is enabled

* core/txpool, eth, miner: pass gas limit infos in lazy tx for mienr filtering

* core/txpool, miner: add lazy resolver for pending txs too

* core, eth: fix review noticed bugs

* eth, miner: minor polishes in the mining and announcing logs

* core/expool: unsubscribe the event scope
  • Loading branch information
karalabe committed Oct 4, 2023
1 parent bc6d184 commit a8a9c8e
Show file tree
Hide file tree
Showing 14 changed files with 145 additions and 73 deletions.
55 changes: 40 additions & 15 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ type blobTxMeta struct {
execTipCap *uint256.Int // Needed to prioritize inclusion order across accounts and validate replacement price bump
execFeeCap *uint256.Int // Needed to validate replacement price bump
blobFeeCap *uint256.Int // Needed to validate replacement price bump
execGas uint64 // Needed to check inclusion validity before reading the blob
blobGas uint64 // Needed to check inclusion validity before reading the blob

basefeeJumps float64 // Absolute number of 1559 fee adjustments needed to reach the tx's fee cap
blobfeeJumps float64 // Absolute number of 4844 fee adjustments needed to reach the tx's blob fee cap
Expand All @@ -118,6 +120,8 @@ func newBlobTxMeta(id uint64, size uint32, tx *types.Transaction) *blobTxMeta {
execTipCap: uint256.MustFromBig(tx.GasTipCap()),
execFeeCap: uint256.MustFromBig(tx.GasFeeCap()),
blobFeeCap: uint256.MustFromBig(tx.BlobGasFeeCap()),
execGas: tx.Gas(),
blobGas: tx.BlobGas(),
}
meta.basefeeJumps = dynamicFeeJumps(meta.execFeeCap)
meta.blobfeeJumps = dynamicFeeJumps(meta.blobFeeCap)
Expand Down Expand Up @@ -307,8 +311,8 @@ type BlobPool struct {
spent map[common.Address]*uint256.Int // Expenditure tracking for individual accounts
evict *evictHeap // Heap of cheapest accounts for eviction when full

eventFeed event.Feed // Event feed to send out new tx events on pool inclusion
eventScope event.SubscriptionScope // Event scope to track and mass unsubscribe on termination
discoverFeed event.Feed // Event feed to send out new tx events on pool discovery (reorg excluded)
insertFeed event.Feed // Event feed to send out new tx events on pool inclusion (reorg included)

lock sync.RWMutex // Mutex protecting the pool during reorg handling
}
Expand Down Expand Up @@ -436,8 +440,6 @@ func (p *BlobPool) Close() error {
if err := p.store.Close(); err != nil {
errs = append(errs, err)
}
p.eventScope.Close()

switch {
case errs == nil:
return nil
Expand Down Expand Up @@ -758,15 +760,21 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) {
// Run the reorg between the old and new head and figure out which accounts
// need to be rechecked and which transactions need to be readded
if reinject, inclusions := p.reorg(oldHead, newHead); reinject != nil {
var adds []*types.Transaction
for addr, txs := range reinject {
// Blindly push all the lost transactions back into the pool
for _, tx := range txs {
p.reinject(addr, tx.Hash())
if err := p.reinject(addr, tx.Hash()); err == nil {
adds = append(adds, tx.WithoutBlobTxSidecar())
}
}
// Recheck the account's pooled transactions to drop included and
// invalidated one
p.recheck(addr, inclusions)
}
if len(adds) > 0 {
p.insertFeed.Send(core.NewTxsEvent{Txs: adds})
}
}
// Flush out any blobs from limbo that are older than the latest finality
if p.chain.Config().IsCancun(p.head.Number, p.head.Time) {
Expand Down Expand Up @@ -921,13 +929,13 @@ func (p *BlobPool) reorg(oldHead, newHead *types.Header) (map[common.Address][]*
// Note, the method will not initialize the eviction cache values as those will
// be done once for all transactions belonging to an account after all individual
// transactions are injected back into the pool.
func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) {
func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
// Retrieve the associated blob from the limbo. Without the blobs, we cannot
// add the transaction back into the pool as it is not mineable.
tx, err := p.limbo.pull(txhash)
if err != nil {
log.Error("Blobs unavailable, dropping reorged tx", "err", err)
return
return err
}
// TODO: seems like an easy optimization here would be getting the serialized tx
// from limbo instead of re-serializing it here.
Expand All @@ -936,20 +944,20 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) {
blob, err := rlp.EncodeToBytes(tx)
if err != nil {
log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err)
return
return err
}
id, err := p.store.Put(blob)
if err != nil {
log.Error("Failed to write transaction into storage", "hash", tx.Hash(), "err", err)
return
return err
}

// Update the indixes and metrics
meta := newBlobTxMeta(id, p.store.Size(id), tx)
if _, ok := p.index[addr]; !ok {
if err := p.reserve(addr, true); err != nil {
log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err)
return
return err
}
p.index[addr] = []*blobTxMeta{meta}
p.spent[addr] = meta.costCap
Expand All @@ -960,6 +968,7 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) {
}
p.lookup[meta.hash] = meta.id
p.stored += uint64(meta.size)
return nil
}

// SetGasTip implements txpool.SubPool, allowing the blob pool's gas requirements
Expand Down Expand Up @@ -1154,9 +1163,19 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
// Add inserts a set of blob transactions into the pool if they pass validation (both
// consensus validity and pool restictions).
func (p *BlobPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
errs := make([]error, len(txs))
var (
adds = make([]*types.Transaction, 0, len(txs))
errs = make([]error, len(txs))
)
for i, tx := range txs {
errs[i] = p.add(tx)
if errs[i] == nil {
adds = append(adds, tx.WithoutBlobTxSidecar())
}
}
if len(adds) > 0 {
p.discoverFeed.Send(core.NewTxsEvent{Txs: adds})
p.insertFeed.Send(core.NewTxsEvent{Txs: adds})
}
return errs
}
Expand Down Expand Up @@ -1384,6 +1403,8 @@ func (p *BlobPool) Pending(enforceTips bool) map[common.Address][]*txpool.LazyTr
Time: time.Now(), // TODO(karalabe): Maybe save these and use that?
GasFeeCap: tx.execFeeCap.ToBig(),
GasTipCap: tx.execTipCap.ToBig(),
Gas: tx.execGas,
BlobGas: tx.blobGas,
})
}
if len(lazies) > 0 {
Expand Down Expand Up @@ -1468,10 +1489,14 @@ func (p *BlobPool) updateLimboMetrics() {
limboSlotusedGauge.Update(int64(slotused))
}

// SubscribeTransactions registers a subscription of NewTxsEvent and
// starts sending event to the given channel.
func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription {
return p.eventScope.Track(p.eventFeed.Subscribe(ch))
// SubscribeTransactions registers a subscription for new transaction events,
// supporting feeding only newly seen or also resurrected transactions.
func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
if reorgs {
return p.insertFeed.Subscribe(ch)
} else {
return p.discoverFeed.Subscribe(ch)
}
}

// Nonce returns the next nonce of an account, with all transactions executable
Expand Down
18 changes: 10 additions & 8 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ type LegacyPool struct {
chain BlockChain
gasTip atomic.Pointer[big.Int]
txFeed event.Feed
scope event.SubscriptionScope
signer types.Signer
mu sync.RWMutex

Expand Down Expand Up @@ -404,9 +403,6 @@ func (pool *LegacyPool) loop() {

// Close terminates the transaction pool.
func (pool *LegacyPool) Close() error {
// Unsubscribe all subscriptions registered from txpool
pool.scope.Close()

// Terminate the pool reorger and return
close(pool.reorgShutdownCh)
pool.wg.Wait()
Expand All @@ -425,10 +421,14 @@ func (pool *LegacyPool) Reset(oldHead, newHead *types.Header) {
<-wait
}

// SubscribeTransactions registers a subscription of NewTxsEvent and
// starts sending event to the given channel.
func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription {
return pool.scope.Track(pool.txFeed.Subscribe(ch))
// SubscribeTransactions registers a subscription for new transaction events,
// supporting feeding only newly seen or also resurrected transactions.
func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
// The legacy pool has a very messed up internal shuffling, so it's kind of
// hard to separate newly discovered transaction from resurrected ones. This
// is because the new txs are added to the queue, resurrected ones too and
// reorgs run lazily, so separating the two would need a marker.
return pool.txFeed.Subscribe(ch)
}

// SetGasTip updates the minimum gas tip required by the transaction pool for a
Expand Down Expand Up @@ -552,6 +552,8 @@ func (pool *LegacyPool) Pending(enforceTips bool) map[common.Address][]*txpool.L
Time: txs[i].Time(),
GasFeeCap: txs[i].GasFeeCap(),
GasTipCap: txs[i].GasTipCap(),
Gas: txs[i].Gas(),
BlobGas: txs[i].BlobGas(),
}
}
pending[addr] = lazies
Expand Down
19 changes: 16 additions & 3 deletions core/txpool/subpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@ import (
// enough for the miner and other APIs to handle large batches of transactions;
// and supports pulling up the entire transaction when really needed.
type LazyTransaction struct {
Pool SubPool // Transaction subpool to pull the real transaction up
Pool LazyResolver // Transaction resolver to pull the real transaction up
Hash common.Hash // Transaction hash to pull up if needed
Tx *types.Transaction // Transaction if already resolved

Time time.Time // Time when the transaction was first seen
GasFeeCap *big.Int // Maximum fee per gas the transaction may consume
GasTipCap *big.Int // Maximum miner tip per gas the transaction can pay

Gas uint64 // Amount of gas required by the transaction
BlobGas uint64 // Amount of blob gas required by the transaction
}

// Resolve retrieves the full transaction belonging to a lazy handle if it is still
Expand All @@ -48,6 +51,14 @@ func (ltx *LazyTransaction) Resolve() *types.Transaction {
return ltx.Tx
}

// LazyResolver is a minimal interface needed for a transaction pool to satisfy
// resolving lazy transactions. It's mostly a helper to avoid the entire sub-
// pool being injected into the lazy transaction.
type LazyResolver interface {
// Get returns a transaction if it is contained in the pool, or nil otherwise.
Get(hash common.Hash) *types.Transaction
}

// AddressReserver is passed by the main transaction pool to subpools, so they
// may request (and relinquish) exclusive access to certain addresses.
type AddressReserver func(addr common.Address, reserve bool) error
Expand Down Expand Up @@ -99,8 +110,10 @@ type SubPool interface {
// account and sorted by nonce.
Pending(enforceTips bool) map[common.Address][]*LazyTransaction

// SubscribeTransactions subscribes to new transaction events.
SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription
// SubscribeTransactions subscribes to new transaction events. The subscriber
// can decide whether to receive notifications only for newly seen transactions
// or also for reorged out ones.
SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription

// Nonce returns the next nonce of an account, with all transactions executable
// by the pool already applied on top.
Expand Down
12 changes: 7 additions & 5 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,15 @@ func (p *TxPool) Close() error {
if err := <-errc; err != nil {
errs = append(errs, err)
}

// Terminate each subpool
for _, subpool := range p.subpools {
if err := subpool.Close(); err != nil {
errs = append(errs, err)
}
}
// Unsubscribe anyone still listening for tx events
p.subs.Close()

if len(errs) > 0 {
return fmt.Errorf("subpool close errors: %v", errs)
}
Expand Down Expand Up @@ -316,12 +318,12 @@ func (p *TxPool) Pending(enforceTips bool) map[common.Address][]*LazyTransaction
return txs
}

// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and starts sending
// events to the given channel.
func (p *TxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
// SubscribeTransactions registers a subscription for new transaction events,
// supporting feeding only newly seen or also resurrected transactions.
func (p *TxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
subs := make([]event.Subscription, len(p.subpools))
for i, subpool := range p.subpools {
subs[i] = subpool.SubscribeTransactions(ch)
subs[i] = subpool.SubscribeTransactions(ch, reorgs)
}
return p.subs.Track(event.JoinSubscriptions(subs...))
}
Expand Down
2 changes: 1 addition & 1 deletion eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (b *EthAPIBackend) TxPool() *txpool.TxPool {
}

func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return b.eth.txPool.SubscribeNewTxsEvent(ch)
return b.eth.txPool.SubscribeTransactions(ch, true)
}

func (b *EthAPIBackend) SyncProgress() ethereum.SyncProgress {
Expand Down
2 changes: 1 addition & 1 deletion eth/catalyst/simulated_beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal) error {
func (c *SimulatedBeacon) loopOnDemand() {
var (
newTxs = make(chan core.NewTxsEvent)
sub = c.eth.TxPool().SubscribeNewTxsEvent(newTxs)
sub = c.eth.TxPool().SubscribeTransactions(newTxs, true)
)
defer sub.Unsubscribe()

Expand Down
41 changes: 24 additions & 17 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,10 @@ type txPool interface {
// The slice should be modifiable by the caller.
Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction

// SubscribeNewTxsEvent should return an event subscription of
// NewTxsEvent and send events to the given channel.
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
// SubscribeTransactions subscribes to new transaction events. The subscriber
// can decide whether to receive notifications only for newly seen transactions
// or also for reorged out ones.
SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription
}

// handlerConfig is the collection of initialization parameters to create a full
Expand Down Expand Up @@ -509,10 +510,10 @@ func (h *handler) unregisterPeer(id string) {
func (h *handler) Start(maxPeers int) {
h.maxPeers = maxPeers

// broadcast transactions
// broadcast and announce transactions (only new ones, not resurrected ones)
h.wg.Add(1)
h.txsCh = make(chan core.NewTxsEvent, txChanSize)
h.txsSub = h.txpool.SubscribeNewTxsEvent(h.txsCh)
h.txsSub = h.txpool.SubscribeTransactions(h.txsCh, false)
go h.txBroadcastLoop()

// broadcast mined blocks
Expand Down Expand Up @@ -592,26 +593,33 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
}

// BroadcastTransactions will propagate a batch of transactions
// - To a square root of all peers
// - To a square root of all peers for non-blob transactions
// - And, separately, as announcements to all peers which are not known to
// already have the given transaction.
func (h *handler) BroadcastTransactions(txs types.Transactions) {
var (
annoCount int // Count of announcements made
annoPeers int
directCount int // Count of the txs sent directly to peers
directPeers int // Count of the peers that were sent transactions directly
blobTxs int // Number of blob transactions to announce only
largeTxs int // Number of large transactions to announce only

directCount int // Number of transactions sent directly to peers (duplicates included)
directPeers int // Number of peers that were sent transactions directly
annCount int // Number of transactions announced across all peers (duplicates included)
annPeers int // Number of peers announced about transactions

txset = make(map[*ethPeer][]common.Hash) // Set peer->hash to transfer directly
annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce

)
// Broadcast transactions to a batch of peers not knowing about it
for _, tx := range txs {
peers := h.peers.peersWithoutTransaction(tx.Hash())

var numDirect int
if tx.Size() <= txMaxBroadcastSize {
switch {
case tx.Type() == types.BlobTxType:
blobTxs++
case tx.Size() > txMaxBroadcastSize:
largeTxs++
default:
numDirect = int(math.Sqrt(float64(len(peers))))
}
// Send the tx unconditionally to a subset of our peers
Expand All @@ -629,13 +637,12 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
peer.AsyncSendTransactions(hashes)
}
for peer, hashes := range annos {
annoPeers++
annoCount += len(hashes)
annPeers++
annCount += len(hashes)
peer.AsyncSendPooledTransactionHashes(hashes)
}
log.Debug("Transaction broadcast", "txs", len(txs),
"announce packs", annoPeers, "announced hashes", annoCount,
"tx packs", directPeers, "broadcast txs", directCount)
log.Debug("Distributed transactions", "plaintxs", len(txs)-blobTxs-largeTxs, "blobtxs", blobTxs, "largetxs", largeTxs,
"bcastpeers", directPeers, "bcastcount", directCount, "annpeers", annPeers, "anncount", annCount)
}

// minedBroadcastLoop sends mined blocks to connected peers.
Expand Down

0 comments on commit a8a9c8e

Please sign in to comment.