Skip to content

Commit

Permalink
Hack pending blob tx retrieval into the miner for benchmarking
Browse files Browse the repository at this point in the history
  • Loading branch information
karalabe committed Apr 3, 2023
1 parent 8ea65c5 commit 7eccdff
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 17 deletions.
56 changes: 54 additions & 2 deletions core/txpool/blobpool/blobpool.go
Expand Up @@ -125,6 +125,12 @@ func newBlobTxMeta(id uint64, size uint32, tx *types.Transaction) *blobTxMeta {
return meta
}

// BlobTxShim is an extremely tiny subset of a blob transaction that is used by
// the miner to sort and select transactions to include, requesting the needed
// data only for those transactions that will really get added to the next block.
type BlobTxShim struct {
}

// BlobPool is the transaction pool dedicated to EIP-4844 blob transactions.
//
// Blob transactions are special snowflakes that are designed for a very specific
Expand Down Expand Up @@ -199,7 +205,14 @@ func newBlobTxMeta(id uint64, size uint32, tx *types.Transaction) *blobTxMeta {
// viable because TPS is low (2-4 blobs per block initially, maybe 8-16 at
// peak), so natural churn is a couple MB per block. Replacements doing O(n)
// updates are forbidden and transaction propagation is pull based (i.e. no
// pilup of pending data).
// pileup of pending data).
//
// - When transactions are chosen for inclusion, the primary criteria is the
// signer tip (and having a basefee/data fee high enough of course). However,
// same-tip tranactions will be split by their basefee/datafee, prefering
// those that are closer to the current network limits. The idea being that
// very relaxed ones can be included even if the fees go up, when the closer
// ones could already be invalid.
//
// When the pool eventually reaches saturation, some old transactions - that may
// never execute - will need to be evicted in favor of newer ones. The eviction
Expand Down Expand Up @@ -289,7 +302,7 @@ type BlobPool struct {

index map[common.Address][]*blobTxMeta // Blob transactions groupped by accounts, sorted by nonce
spent map[common.Address]*uint256.Int // Expenditure tracking for individual accounts
evict *priceHeap // Heap of cheapest accounts for eviction when full
evict *evictHeap // Heap of cheapest accounts for eviction when full

lock sync.Mutex // Mutex protecting the pool during reorg handling
}
Expand Down Expand Up @@ -1198,6 +1211,45 @@ func (p *BlobPool) drop() {
}
}

// Pending retrieves a snapshot of the pending transactions for the miner to sift
// through and pick the best ones. The method already does a pre-filtering since
// there's no point to retrieve non-executble ones.
//
// Note, please don't provide an RPC API method to expose these. The blob pool will
// ideally get enormous, and it will not be feasible to constatly expose that entire
// dump out of the process.
func (p *BlobPool) Pending(basefee, datafee *uint256.Int) map[common.Address][]*BlobTxShim {
waitStart := time.Now()
p.lock.Lock()
pendwaitHist.Update(time.Since(waitStart).Nanoseconds())
defer p.lock.Unlock()

defer func(start time.Time) {
pendtimeHist.Update(time.Since(start).Nanoseconds())
}(time.Now())

pending := make(map[common.Address][]*BlobTxShim)
for addr, txs := range p.index {
var shims []*BlobTxShim
for _, tx := range txs {
// If the transaction cannot be executed in the current block, or does
// not meet teh minimum required tip, stop aggregating the account
if tx.execFeeCap.Lt(basefee) || tx.blobFeeCap.Lt(datafee) {
break
}
if new(uint256.Int).Sub(tx.execFeeCap, basefee).Lt(p.gasTip) {
break
}
// Transaction met teh minimum filters, att to the shims
shims = append(shims, &BlobTxShim{})
}
if len(shims) > 0 {
pending[addr] = shims
}
}
return pending
}

// updateStorageMetrics retrieves a bunch of stats from the data store and pushes
// them out as metrics.
func (p *BlobPool) updateStorageMetrics() {
Expand Down
Expand Up @@ -24,14 +24,15 @@ import (
"github.com/holiman/uint256"
)

// priceHeap is a helper data structure to keep track of the cheapest bottleneck
// evictHeap is a helper data structure to keep track of the cheapest bottleneck
// transaction from each account to determine which account to evict from.
//
// The heap internally tracks a slice of cheapest transactions from each account
// and a mapping from add
// and a mapping from addresses to indices for direct removals/udates.
//
// The goal of the heap is to decide which
type priceHeap struct {
// The goal of the heap is to decide which account has the worst bottleneck to
// evict transactions from.
type evictHeap struct {
metas *map[common.Address][]*blobTxMeta // Pointer to the blob pool's index for price retrievals

basefeeJumps float64 // Pre-calculated absolute dynamic fee jumps for the base fee
Expand All @@ -43,8 +44,8 @@ type priceHeap struct {

// newPriceHeap creates a new heap of cheapets accounts in the blob pool to evict
// from in case of over saturation.
func newPriceHeap(basefee *uint256.Int, blobfee *uint256.Int, index *map[common.Address][]*blobTxMeta) *priceHeap {
heap := &priceHeap{
func newPriceHeap(basefee *uint256.Int, blobfee *uint256.Int, index *map[common.Address][]*blobTxMeta) *evictHeap {
heap := &evictHeap{
metas: index,
index: make(map[common.Address]int),
}
Expand All @@ -58,7 +59,7 @@ func newPriceHeap(basefee *uint256.Int, blobfee *uint256.Int, index *map[common.

// reinit updates the pre-calculated dynamic fee jumps in the price heap and runs
// the sorting algorithm from scratch on the entire heap.
func (h *priceHeap) reinit(basefee *uint256.Int, blobfee *uint256.Int, force bool) {
func (h *evictHeap) reinit(basefee *uint256.Int, blobfee *uint256.Int, force bool) {
// If the update is mostly the same as the old, don't sort pointlessly
basefeeJumps := dynamicFeeJumps(basefee)
blobfeeJumps := dynamicFeeJumps(blobfee)
Expand All @@ -75,13 +76,13 @@ func (h *priceHeap) reinit(basefee *uint256.Int, blobfee *uint256.Int, force boo

// Len implements sort.Interface as part of heap.Interface, returning the number
// of accounts in the pool which can be considered for eviction.
func (h *priceHeap) Len() int {
func (h *evictHeap) Len() int {
return len(h.addrs)
}

// Less implements sort.Interface as part of heap.Interface, returning which of
// the two requested accounts has a cheaper bottleneck.
func (h *priceHeap) Less(i, j int) bool {
func (h *evictHeap) Less(i, j int) bool {
txsI := (*(h.metas))[h.addrs[i]]
txsJ := (*(h.metas))[h.addrs[j]]

Expand All @@ -105,14 +106,14 @@ func (h *priceHeap) Less(i, j int) bool {
// Swap implements sort.Interface as part of heap.Interface, maintaining both the
// order of the accounts according to the heap, and the account->item slot mapping
// for replacements.
func (h *priceHeap) Swap(i, j int) {
func (h *evictHeap) Swap(i, j int) {
h.index[h.addrs[i]], h.index[h.addrs[j]] = h.index[h.addrs[j]], h.index[h.addrs[i]]
h.addrs[i], h.addrs[j] = h.addrs[j], h.addrs[i]
}

// Push implements heap.Interface, appending an item to the end of the account
// ordering as well as the address to item slot mapping.
func (h *priceHeap) Push(x any) {
func (h *evictHeap) Push(x any) {
h.index[x.(common.Address)] = len(h.addrs)
h.addrs = append(h.addrs, x.(common.Address))
}
Expand All @@ -122,7 +123,7 @@ func (h *priceHeap) Push(x any) {
//
// Note, use `heap.Pop`, not `priceheap.Pop`. This method is used by Go's heap,
// to provide the functionality, it does not embed it.
func (h *priceHeap) Pop() any {
func (h *evictHeap) Pop() any {
// Remove the last element from the heap
size := len(h.addrs)
addr := h.addrs[size-1]
Expand Down
Expand Up @@ -28,7 +28,7 @@ import (

// verifyHeapInternals verifies that all accounts present in the index are also
// present in the heap and internals are consistent across various indices.
func verifyHeapInternals(t *testing.T, evict *priceHeap) {
func verifyHeapInternals(t *testing.T, evict *evictHeap) {
t.Helper()

// Ensure that all accounts are present in the heap and no extras
Expand Down
7 changes: 5 additions & 2 deletions core/txpool/blobpool/metrics.go
Expand Up @@ -64,10 +64,13 @@ var (
// the pool.
pooltipGague = metrics.NewRegisteredGauge("blobpool/pooltip", nil)

// addwait/time and resetwait/time track the rough health of the pool and
// wether or not it's capable of keeping up with the load from the network.
// addwait/time, resetwait/time and pendwait/time track the rough health of
// the pool and wether or not it's capable of keeping up with the load from
// the network.
addwaitHist = metrics.NewRegisteredHistogram("blobpool/addwait", nil, metrics.NewExpDecaySample(1028, 0.015))
addtimeHist = metrics.NewRegisteredHistogram("blobpool/addtime", nil, metrics.NewExpDecaySample(1028, 0.015))
resetwaitHist = metrics.NewRegisteredHistogram("blobpool/resetwait", nil, metrics.NewExpDecaySample(1028, 0.015))
resettimeHist = metrics.NewRegisteredHistogram("blobpool/resettime", nil, metrics.NewExpDecaySample(1028, 0.015))
pendwaitHist = metrics.NewRegisteredHistogram("blobpool/pendwait", nil, metrics.NewExpDecaySample(1028, 0.015))
pendtimeHist = metrics.NewRegisteredHistogram("blobpool/pendtime", nil, metrics.NewExpDecaySample(1028, 0.015))
)
1 change: 1 addition & 0 deletions eth/backend.go
Expand Up @@ -481,6 +481,7 @@ func (s *Ethereum) Miner() *miner.Miner { return s.miner }
func (s *Ethereum) AccountManager() *accounts.Manager { return s.accountManager }
func (s *Ethereum) BlockChain() *core.BlockChain { return s.blockchain }
func (s *Ethereum) TxPool() *txpool.TxPool { return s.txPool }
func (s *Ethereum) BlobPool() *blobpool.BlobPool { return s.blobPool }
func (s *Ethereum) EventMux() *event.TypeMux { return s.eventMux }
func (s *Ethereum) Engine() consensus.Engine { return s.engine }
func (s *Ethereum) ChainDb() ethdb.Database { return s.chainDb }
Expand Down
2 changes: 2 additions & 0 deletions miner/miner.go
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/blobpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/event"
Expand All @@ -41,6 +42,7 @@ import (
type Backend interface {
BlockChain() *core.BlockChain
TxPool() *txpool.TxPool
BlobPool() *blobpool.BlobPool
}

// Config is the configuration parameters of mining.
Expand Down
7 changes: 7 additions & 0 deletions miner/worker.go
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"
"github.com/holiman/uint256"
)

const (
Expand Down Expand Up @@ -1071,6 +1072,12 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment) error {
// Split the pending transactions into locals and remotes
// Fill the block with all available pending transactions.
pending := w.eth.TxPool().Pending(true)
blobtxs := w.eth.BlobPool().Pending(
uint256.MustFromBig(env.header.BaseFee),
uint256.MustFromBig(misc.CalcBlobFee(env.header.ExcessDataGas)),
)
log.Trace("Side-effect log, much wow", "blobs", len(blobtxs))

localTxs, remoteTxs := make(map[common.Address]types.Transactions), pending
for _, account := range w.eth.TxPool().Locals() {
if txs := remoteTxs[account]; len(txs) > 0 {
Expand Down

0 comments on commit 7eccdff

Please sign in to comment.