Skip to content

Commit

Permalink
core: move tx indexer to its own file (#28857)
Browse files Browse the repository at this point in the history
This change moves all the transaction indexing functions to a separate txindexer.go file and defines a txIndexer structure as a refactoring.
  • Loading branch information
rjl493456442 committed Jan 23, 2024
1 parent 542c861 commit 6b0de79
Show file tree
Hide file tree
Showing 6 changed files with 477 additions and 498 deletions.
178 changes: 7 additions & 171 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,17 +192,6 @@ type txLookup struct {
transaction *types.Transaction
}

// TxIndexProgress is the struct describing the progress for transaction indexing.
type TxIndexProgress struct {
Indexed uint64 // number of blocks whose transactions are indexed
Remaining uint64 // number of blocks whose transactions are not indexed yet
}

// Done returns an indicator if the transaction indexing is finished.
func (prog TxIndexProgress) Done() bool {
return prog.Remaining == 0
}

// BlockChain represents the canonical chain given a database with a genesis
// block. The Blockchain manages chain imports, reverts, chain reorganisations.
//
Expand All @@ -229,13 +218,7 @@ type BlockChain struct {
flushInterval atomic.Int64 // Time interval (processing time) after which to flush a state
triedb *trie.Database // The database handler for maintaining trie nodes.
stateCache state.Database // State database to reuse between imports (contains state cache)

// txLookupLimit is the maximum number of blocks from head whose tx indices
// are reserved:
// * 0: means no limit and regenerate any missing indexes
// * N: means N block limit [HEAD-N+1, HEAD] and delete extra indexes
// * nil: disable tx reindexer/deleter, but still index new blocks
txLookupLimit uint64
txIndexer *txIndexer // Transaction indexer, might be nil if not enabled

hc *HeaderChain
rmLogsFeed event.Feed
Expand Down Expand Up @@ -270,9 +253,6 @@ type BlockChain struct {
stopping atomic.Bool // false if chain is running, true when stopped
procInterrupt atomic.Bool // interrupt signaler for block processing

txIndexRunning bool // flag if the background tx indexer is activated
txIndexProgCh chan chan TxIndexProgress // chan for querying the progress of transaction indexing

engine consensus.Engine
validator Validator // Block and state validator interface
prefetcher Prefetcher
Expand Down Expand Up @@ -320,7 +300,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit),
txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit),
futureBlocks: lru.NewCache[common.Hash, *types.Block](maxFutureBlocks),
txIndexProgCh: make(chan chan TxIndexProgress),
engine: engine,
vmConfig: vmConfig,
}
Expand Down Expand Up @@ -485,13 +464,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
}
rawdb.WriteChainConfig(db, genesisHash, chainConfig)
}
// Start tx indexer/unindexer if required.
// Start tx indexer if it's enabled.
if txLookupLimit != nil {
bc.txLookupLimit = *txLookupLimit
bc.txIndexRunning = true

bc.wg.Add(1)
go bc.maintainTxIndex()
bc.txIndexer = newTxIndexer(*txLookupLimit, bc)
}
return bc, nil
}
Expand Down Expand Up @@ -981,7 +956,10 @@ func (bc *BlockChain) stopWithoutSaving() {
if !bc.stopping.CompareAndSwap(false, true) {
return
}

// Signal shutdown tx indexer.
if bc.txIndexer != nil {
bc.txIndexer.close()
}
// Unsubscribe all subscriptions registered from blockchain.
bc.scope.Close()

Expand Down Expand Up @@ -2403,148 +2381,6 @@ func (bc *BlockChain) skipBlock(err error, it *insertIterator) bool {
return false
}

// indexBlocks reindexes or unindexes transactions depending on user configuration
func (bc *BlockChain) indexBlocks(tail *uint64, head uint64, done chan struct{}) {
defer func() { close(done) }()

// If head is 0, it means the chain is just initialized and no blocks are
// inserted, so don't need to index anything.
if head == 0 {
return
}
// The tail flag is not existent, it means the node is just initialized
// and all blocks in the chain (part of them may from ancient store) are
// not indexed yet, index the chain according to the configuration then.
if tail == nil {
from := uint64(0)
if bc.txLookupLimit != 0 && head >= bc.txLookupLimit {
from = head - bc.txLookupLimit + 1
}
rawdb.IndexTransactions(bc.db, from, head+1, bc.quit, true)
return
}
// The tail flag is existent (which means indexes in [tail, head] should be
// present), while the whole chain are requested for indexing.
if bc.txLookupLimit == 0 || head < bc.txLookupLimit {
if *tail > 0 {
// It can happen when chain is rewound to a historical point which
// is even lower than the indexes tail, recap the indexing target
// to new head to avoid reading non-existent block bodies.
end := *tail
if end > head+1 {
end = head + 1
}
rawdb.IndexTransactions(bc.db, 0, end, bc.quit, true)
}
return
}
// The tail flag is existent, adjust the index range according to configuration
// and latest head.
if head-bc.txLookupLimit+1 < *tail {
// Reindex a part of missing indices and rewind index tail to HEAD-limit
rawdb.IndexTransactions(bc.db, head-bc.txLookupLimit+1, *tail, bc.quit, true)
} else {
// Unindex a part of stale indices and forward index tail to HEAD-limit
rawdb.UnindexTransactions(bc.db, *tail, head-bc.txLookupLimit+1, bc.quit, false)
}
}

// reportTxIndexProgress returns the tx indexing progress.
func (bc *BlockChain) reportTxIndexProgress(head uint64) TxIndexProgress {
var (
remaining uint64
tail = rawdb.ReadTxIndexTail(bc.db)
)
total := bc.txLookupLimit
if bc.txLookupLimit == 0 {
total = head + 1 // genesis included
}
var indexed uint64
if tail != nil {
indexed = head - *tail + 1
}
// The value of indexed might be larger than total if some blocks need
// to be unindexed, avoiding a negative remaining.
if indexed < total {
remaining = total - indexed
}
return TxIndexProgress{
Indexed: indexed,
Remaining: remaining,
}
}

// TxIndexProgress retrieves the tx indexing progress, or an error if the
// background tx indexer is not activated or already stopped.
func (bc *BlockChain) TxIndexProgress() (TxIndexProgress, error) {
if !bc.txIndexRunning {
return TxIndexProgress{}, errors.New("tx indexer is not activated")
}
ch := make(chan TxIndexProgress, 1)
select {
case bc.txIndexProgCh <- ch:
return <-ch, nil
case <-bc.quit:
return TxIndexProgress{}, errors.New("blockchain is closed")
}
}

// maintainTxIndex is responsible for the construction and deletion of the
// transaction index.
//
// User can use flag `txlookuplimit` to specify a "recentness" block, below
// which ancient tx indices get deleted. If `txlookuplimit` is 0, it means
// all tx indices will be reserved.
//
// The user can adjust the txlookuplimit value for each launch after sync,
// Geth will automatically construct the missing indices or delete the extra
// indices.
func (bc *BlockChain) maintainTxIndex() {
defer bc.wg.Done()

// Listening to chain events and manipulate the transaction indexes.
var (
done chan struct{} // Non-nil if background unindexing or reindexing routine is active.
lastHead uint64 // The latest announced chain head (whose tx indexes are assumed created)
headCh = make(chan ChainHeadEvent, 1) // Buffered to avoid locking up the event feed
)
sub := bc.SubscribeChainHeadEvent(headCh)
if sub == nil {
return
}
defer sub.Unsubscribe()
log.Info("Initialized transaction indexer", "limit", bc.TxLookupLimit())

// Launch the initial processing if chain is not empty (head != genesis).
// This step is useful in these scenarios that chain has no progress and
// indexer is never triggered.
if head := rawdb.ReadHeadBlock(bc.db); head != nil && head.Number().Uint64() != 0 {
done = make(chan struct{})
lastHead = head.Number().Uint64()
go bc.indexBlocks(rawdb.ReadTxIndexTail(bc.db), head.NumberU64(), done)
}
for {
select {
case head := <-headCh:
if done == nil {
done = make(chan struct{})
go bc.indexBlocks(rawdb.ReadTxIndexTail(bc.db), head.Block.NumberU64(), done)
}
lastHead = head.Block.NumberU64()
case <-done:
done = nil
case ch := <-bc.txIndexProgCh:
ch <- bc.reportTxIndexProgress(lastHead)
case <-bc.quit:
if done != nil {
log.Info("Waiting background transaction indexer to exit")
<-done
}
return
}
}
}

// reportBlock logs a bad block error.
func (bc *BlockChain) reportBlock(block *types.Block, receipts types.Receipts, err error) {
rawdb.WriteBadBlock(bc.db, block)
Expand Down
16 changes: 6 additions & 10 deletions core/blockchain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,16 +397,12 @@ func (bc *BlockChain) GetVMConfig() *vm.Config {
return &bc.vmConfig
}

// SetTxLookupLimit is responsible for updating the txlookup limit to the
// original one stored in db if the new mismatches with the old one.
func (bc *BlockChain) SetTxLookupLimit(limit uint64) {
bc.txLookupLimit = limit
}

// TxLookupLimit retrieves the txlookup limit used by blockchain to prune
// stale transaction indices.
func (bc *BlockChain) TxLookupLimit() uint64 {
return bc.txLookupLimit
// TxIndexProgress returns the transaction indexing progress.
func (bc *BlockChain) TxIndexProgress() (TxIndexProgress, error) {
if bc.txIndexer == nil {
return TxIndexProgress{}, errors.New("tx indexer is not enabled")
}
return bc.txIndexer.txIndexProgress()
}

// TrieDB retrieves the low level trie database used for data storage.
Expand Down

0 comments on commit 6b0de79

Please sign in to comment.