Skip to content

Commit

Permalink
all: split out current txpool as one of many subpools
Browse files Browse the repository at this point in the history
  • Loading branch information
karalabe committed Jun 13, 2023
1 parent 828d64f commit 3d31324
Show file tree
Hide file tree
Showing 46 changed files with 2,798 additions and 2,192 deletions.
1 change: 0 additions & 1 deletion cmd/geth/main.go
Expand Up @@ -79,7 +79,6 @@ var (
utils.TxPoolLifetimeFlag,
utils.BlobPoolDataDirFlag,
utils.BlobPoolDataCapFlag,
utils.BlobPoolPriceLimitFlag,
utils.BlobPoolPriceBumpFlag,
utils.SyncModeFlag,
utils.SyncTargetFlag,
Expand Down
10 changes: 2 additions & 8 deletions cmd/utils/flags.go
Expand Up @@ -41,7 +41,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/legacypool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
Expand Down Expand Up @@ -398,12 +398,6 @@ var (
Value: ethconfig.Defaults.BlobPool.Datacap,
Category: flags.BlobPoolCategory,
}
BlobPoolPriceLimitFlag = &cli.Uint64Flag{
Name: "blobpool.pricelimit",
Usage: "Minimum gas price tip to enforce for acceptance into the blob pool",
Value: ethconfig.Defaults.BlobPool.PriceLimit,
Category: flags.BlobPoolCategory,
}
BlobPoolPriceBumpFlag = &cli.Uint64Flag{
Name: "blobpool.pricebump",
Usage: "Price bump percentage to replace an already existing blob transaction",
Expand Down Expand Up @@ -1504,7 +1498,7 @@ func setGPO(ctx *cli.Context, cfg *gasprice.Config, light bool) {
}
}

func setTxPool(ctx *cli.Context, cfg *txpool.Config) {
func setTxPool(ctx *cli.Context, cfg *legacypool.Config) {
if ctx.IsSet(TxPoolLocalsFlag.Name) {
locals := strings.Split(ctx.String(TxPoolLocalsFlag.Name), ",")
for _, account := range locals {
Expand Down
188 changes: 150 additions & 38 deletions core/txpool/blobpool/blobpool.go
Expand Up @@ -30,10 +30,12 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/misc"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
Expand Down Expand Up @@ -307,93 +309,108 @@ type BlobPool struct {
spent map[common.Address]*uint256.Int // Expenditure tracking for individual accounts
evict *evictHeap // Heap of cheapest accounts for eviction when full

eventFeed event.Feed // Event feed to send out new tx events on pool inclusion
eventScope event.SubscriptionScope // Event scope to track and mass unsubscribe on termination

lock sync.Mutex // Mutex protecting the pool during reorg handling
}

// New creates a new blob transaction pool to gather, sort and filter inbound
// blob transactions from the network.
func New(config Config, chain BlockChain) (*BlobPool, error) {
func New(config Config, chain BlockChain) *BlobPool {
// Sanitize the input to ensure no vulnerable gas prices are set
config = (&config).sanitize()

var (
queuedir = filepath.Join(config.Datadir, pendingTransactionStore)
limbodir = filepath.Join(config.Datadir, limboedTransactionStore)
)
// Create the transaction pool with its initial settings
head := chain.CurrentBlock()
state, err := chain.StateAt(head.Root)
if err != nil {
return nil, err
}
pool := &BlobPool{
return &BlobPool{
config: config,
signer: types.LatestSigner(chain.Config()),
chain: chain,
head: head,
state: state,
index: make(map[common.Address][]*blobTxMeta),
spent: make(map[common.Address]*uint256.Int),
}
}

// Filter returns whether the given transaction can be consumed by the blob pool.
func (p *BlobPool) Filter(tx *types.Transaction) bool {
return tx.Type() == types.BlobTxType
}

// Init sets the gas price needed to keep a transaction in the pool and the chain
// head to allow balance / nonce checks. The transaction journal will be loaded
// from disk and filtered based on the provided starting settings.
func (p *BlobPool) Init(gasTip *big.Int, head *types.Header) error {
var (
queuedir = filepath.Join(p.config.Datadir, pendingTransactionStore)
limbodir = filepath.Join(p.config.Datadir, limboedTransactionStore)
)
state, err := p.chain.StateAt(head.Root)
if err != nil {
return err
}
p.head, p.state = head, state

// Index all transactions on disk and delete anything inprocessable
var fails []uint64
index := func(id uint64, size uint32, blob []byte) {
if pool.parseTransaction(id, size, blob) != nil {
if p.parseTransaction(id, size, blob) != nil {
fails = append(fails, id)
}
}
if err := os.MkdirAll(queuedir, 0700); err != nil {
return nil, err
return err
}
store, err := billy.Open(billy.Options{Path: queuedir}, newSlotter(), index)
if err != nil {
return nil, err
return err
}
pool.store = store
p.store = store

if len(fails) > 0 {
log.Warn("Dropping invalidated blob transactions", "ids", fails)
for _, id := range fails {
if err := pool.store.Delete(id); err != nil {
pool.Close()
return nil, err
if err := p.store.Delete(id); err != nil {
p.Close()
return err
}
}
}
// Sort the indexed transactions by nonce and delete anything gapped, create
// the eviction heap of anyone still standing
for addr, _ := range pool.index {
pool.recheck(addr, nil)
for addr, _ := range p.index {
p.recheck(addr, nil)
}
var (
basefee = uint256.MustFromBig(misc.CalcBaseFee(chain.Config(), pool.head))
blobfee = uint256.MustFromBig(misc.CalcBlobFee(*pool.head.ExcessDataGas))
basefee = uint256.MustFromBig(misc.CalcBaseFee(p.chain.Config(), p.head))
blobfee = uint256.MustFromBig(big.NewInt(params.BlobTxMinDataGasprice))
)
pool.evict = newPriceHeap(basefee, blobfee, &pool.index)
if p.head.ExcessDataGas != nil {
blobfee = uint256.MustFromBig(misc.CalcBlobFee(*p.head.ExcessDataGas))
}
p.evict = newPriceHeap(basefee, blobfee, &p.index)

// Pool initialized, attach the blob limbo to it to track blobs included
// recently but not yet finalized
pool.limbo, err = newLimbo(limbodir)
p.limbo, err = newLimbo(limbodir)
if err != nil {
pool.Close()
return nil, err
p.Close()
return err
}
// Set the configured gas tip, triggering a filtering of anything just loaded
basefeeGauge.Update(int64(basefee.Uint64()))
blobfeeGauge.Update(int64(blobfee.Uint64()))

pool.SetGasTip(new(big.Int).SetUint64(config.PriceLimit))
p.SetGasTip(gasTip)

// Since the user might have modified their pool's capacity, evict anything
// above the current allowance
for pool.stored > pool.config.Datacap {
pool.drop()
for p.stored > p.config.Datacap {
p.drop()
}
// Updat the metrics and return the cosntructed pool
datacapGauge.Update(int64(pool.config.Datacap))
pool.updateStorageMetrics()

return pool, nil
datacapGauge.Update(int64(p.config.Datacap))
p.updateStorageMetrics()
return nil
}

// Close closes down the underlying persistent store.
Expand All @@ -405,6 +422,8 @@ func (p *BlobPool) Close() error {
if err := p.store.Close(); err != nil {
errs = append(errs, err)
}
p.eventScope.Close()

switch {
case errs == nil:
return nil
Expand Down Expand Up @@ -688,8 +707,11 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) {
// Reset the price heap for the new set of basefee/blobfee pairs
var (
basefee = uint256.MustFromBig(misc.CalcBaseFee(p.chain.Config(), newHead))
blobfee = uint256.MustFromBig(misc.CalcBlobFee(*newHead.ExcessDataGas))
blobfee = uint256.MustFromBig(big.NewInt(params.BlobTxMinDataGasprice))
)
if newHead.ExcessDataGas != nil {
blobfee = uint256.MustFromBig(misc.CalcBlobFee(*newHead.ExcessDataGas))
}
p.evict.reinit(basefee, blobfee, false)

basefeeGauge.Update(int64(basefee.Uint64()))
Expand Down Expand Up @@ -978,9 +1000,30 @@ func (p *BlobPool) validateTx(tx *types.Transaction, blobs []kzg4844.Blob, commi
return nil
}

// Has returns an indicator whether subpool has a transaction cached with the
// given hash.
func (p *BlobPool) Has(hash common.Hash) bool {
return false
}

// Get returns a transaction if it is contained in the pool, or nil otherwise.
func (p *BlobPool) Get(hash common.Hash) *txpool.Transaction {
return nil
}

// Add inserts a set of blob transactions into the pool if they pass validation (both
// consensus validity and pool restictions).
func (p *BlobPool) Add(txs []*txpool.Transaction, local bool, sync bool) []error {
errs := make([]error, len(txs))
for i, tx := range txs {
errs[i] = p.add(tx.Tx, tx.BlobTxBlobs, tx.BlobTxCommits, tx.BlobTxProofs)
}
return errs
}

// Add inserts a new blob transaction into the pool if it passes validation (both
// consensus validity and pool restictions).
func (p *BlobPool) Add(tx *types.Transaction, blobs []kzg4844.Blob, commits []kzg4844.Commitment, proofs []kzg4844.Proof) error {
func (p *BlobPool) add(tx *types.Transaction, blobs []kzg4844.Blob, commits []kzg4844.Commitment, proofs []kzg4844.Proof) error {
// The blob pool blocks on adding a transaction. This is because blob txs are
// only even pulled form the network, so this method will act as the overload
// protection for fetches.
Expand Down Expand Up @@ -1159,6 +1202,13 @@ func (p *BlobPool) drop() {
}
}

// Pending retrieves all currently processable transactions, grouped by origin
// account and sorted by nonce.
func (p *BlobPool) Pending(enforceTips bool) map[common.Address][]*types.Transaction {
return nil
}

/*
// Pending retrieves a snapshot of the pending transactions for the miner to sift
// through and pick the best ones. The method already does a pre-filtering since
// there's no point to retrieve non-executble ones.
Expand Down Expand Up @@ -1196,7 +1246,7 @@ func (p *BlobPool) Pending(basefee, datafee *uint256.Int) map[common.Address][]*
}
}
return pending
}
}*/

// updateStorageMetrics retrieves a bunch of stats from the data store and pushes
// them out as metrics.
Expand Down Expand Up @@ -1272,3 +1322,65 @@ func (p *BlobPool) updateLimboMetrics() {
limboDatarealGauge.Update(int64(datareal))
limboSlotusedGauge.Update(int64(slotused))
}

// SubscribeTransactions registers a subscription of NewTxsEvent and
// starts sending event to the given channel.
func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription {
return p.eventScope.Track(p.eventFeed.Subscribe(ch))
}

// Nonce returns the next nonce of an account, with all transactions executable
// by the pool already applied on top.
func (p *BlobPool) Nonce(addr common.Address) uint64 {
p.lock.Lock()
defer p.lock.Unlock()

if txs, ok := p.index[addr]; ok {
return txs[len(txs)-1].nonce + 1
}
return p.state.GetNonce(addr)
}

// Stats retrieves the current pool stats, namely the number of pending and the
// number of queued (non-executable) transactions.
func (p *BlobPool) Stats() (int, int) {
p.lock.Lock()
defer p.lock.Unlock()

var pending int
for _, txs := range p.index {
pending += len(txs)
}
return pending, 0 // No non-executable txs in the blob pool
}

// Content retrieves the data content of the transaction pool, returning all the
// pending as well as queued transactions, grouped by account and sorted by nonce.
//
// For the blob pool, this method will return nothing for now.
// TODO(karalabe): Abstract out the returned metadata.
func (p *BlobPool) Content() (map[common.Address][]*types.Transaction, map[common.Address][]*types.Transaction) {
return make(map[common.Address][]*types.Transaction), make(map[common.Address][]*types.Transaction)
}

// ContentFrom retrieves the data content of the transaction pool, returning the
// pending as well as queued transactions of this address, grouped by nonce.
//
// For the blob pool, this method will return nothing for now.
// TODO(karalabe): Abstract out the returned metadata.
func (p *BlobPool) ContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction) {
return []*types.Transaction{}, []*types.Transaction{}
}

// Locals retrieves the accounts currently considered local by the pool.
//
// There is no notion of local accounts in the blob pool.
func (p *BlobPool) Locals() []common.Address {
return []common.Address{}
}

// Status returns the known status (unknown/pending/queued) of a transaction
// identified by their hashes.
func (p *BlobPool) Status(hash common.Hash) txpool.TxStatus {
return txpool.TxStatusUnknown
}

0 comments on commit 3d31324

Please sign in to comment.