Skip to content

Commit

Permalink
core, core/rawdb, eth/sync: no tx indexing during snap sync (ethereum…
Browse files Browse the repository at this point in the history
…#28703)

This change simplifies the logic for indexing transactions and enhances the UX when transaction is not found by returning more information to users.

Transaction indexing is now considered as a part of the initial sync, and `eth.syncing` will thus be `true` if transaction indexing is not yet finished. API consumers can use the syncing status to determine if the node is ready to serve users.
  • Loading branch information
rjl493456442 authored and Dergarcon committed Jan 31, 2024
1 parent 6a67a51 commit aa814bd
Show file tree
Hide file tree
Showing 23 changed files with 446 additions and 365 deletions.
202 changes: 114 additions & 88 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,24 @@ func DefaultCacheConfigWithScheme(scheme string) *CacheConfig {
return &config
}

// txLookup is wrapper over transaction lookup along with the corresponding
// transaction object.
type txLookup struct {
lookup *rawdb.LegacyTxLookupEntry
transaction *types.Transaction
}

// TxIndexProgress is the struct describing the progress for transaction indexing.
type TxIndexProgress struct {
Indexed uint64 // number of blocks whose transactions are indexed
Remaining uint64 // number of blocks whose transactions are not indexed yet
}

// Done returns an indicator if the transaction indexing is finished.
func (prog TxIndexProgress) Done() bool {
return prog.Remaining == 0
}

// BlockChain represents the canonical chain given a database with a genesis
// block. The Blockchain manages chain imports, reverts, chain reorganisations.
//
Expand Down Expand Up @@ -242,15 +260,18 @@ type BlockChain struct {
bodyRLPCache *lru.Cache[common.Hash, rlp.RawValue]
receiptsCache *lru.Cache[common.Hash, []*types.Receipt]
blockCache *lru.Cache[common.Hash, *types.Block]
txLookupCache *lru.Cache[common.Hash, *rawdb.LegacyTxLookupEntry]
txLookupCache *lru.Cache[common.Hash, txLookup]

// future blocks are blocks added for later processing
futureBlocks *lru.Cache[common.Hash, *types.Block]

wg sync.WaitGroup //
quit chan struct{} // shutdown signal, closed in Stop.
stopping atomic.Bool // false if chain is running, true when stopped
procInterrupt atomic.Bool // interrupt signaler for block processing
wg sync.WaitGroup
quit chan struct{} // shutdown signal, closed in Stop.
stopping atomic.Bool // false if chain is running, true when stopped
procInterrupt atomic.Bool // interrupt signaler for block processing

txIndexRunning bool // flag if the background tx indexer is activated
txIndexProgCh chan chan TxIndexProgress // chan for querying the progress of transaction indexing

engine consensus.Engine
validator Validator // Block and state validator interface
Expand Down Expand Up @@ -297,8 +318,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
bodyRLPCache: lru.NewCache[common.Hash, rlp.RawValue](bodyCacheLimit),
receiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit),
blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit),
txLookupCache: lru.NewCache[common.Hash, *rawdb.LegacyTxLookupEntry](txLookupCacheLimit),
txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit),
futureBlocks: lru.NewCache[common.Hash, *types.Block](maxFutureBlocks),
txIndexProgCh: make(chan chan TxIndexProgress),
engine: engine,
vmConfig: vmConfig,
}
Expand Down Expand Up @@ -466,6 +488,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
// Start tx indexer/unindexer if required.
if txLookupLimit != nil {
bc.txLookupLimit = *txLookupLimit
bc.txIndexRunning = true

bc.wg.Add(1)
go bc.maintainTxIndex()
Expand Down Expand Up @@ -1155,14 +1178,13 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
// Ensure genesis is in ancients.
if first.NumberU64() == 1 {
if frozen, _ := bc.db.Ancients(); frozen == 0 {
b := bc.genesisBlock
td := bc.genesisBlock.Difficulty()
writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{b}, []types.Receipts{nil}, td)
size += writeSize
writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{bc.genesisBlock}, []types.Receipts{nil}, td)
if err != nil {
log.Error("Error writing genesis to ancients", "err", err)
return 0, err
}
size += writeSize
log.Info("Wrote genesis to ancients")
}
}
Expand All @@ -1176,44 +1198,11 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
// Write all chain data to ancients.
td := bc.GetTd(first.Hash(), first.NumberU64())
writeSize, err := rawdb.WriteAncientBlocks(bc.db, blockChain, receiptChain, td)
size += writeSize
if err != nil {
log.Error("Error importing chain data to ancients", "err", err)
return 0, err
}

// Write tx indices if any condition is satisfied:
// * If user requires to reserve all tx indices(txlookuplimit=0)
// * If all ancient tx indices are required to be reserved(txlookuplimit is even higher than ancientlimit)
// * If block number is large enough to be regarded as a recent block
// It means blocks below the ancientLimit-txlookupLimit won't be indexed.
//
// But if the `TxIndexTail` is not nil, e.g. Geth is initialized with
// an external ancient database, during the setup, blockchain will start
// a background routine to re-indexed all indices in [ancients - txlookupLimit, ancients)
// range. In this case, all tx indices of newly imported blocks should be
// generated.
batch := bc.db.NewBatch()
for i, block := range blockChain {
if bc.txLookupLimit == 0 || ancientLimit <= bc.txLookupLimit || block.NumberU64() >= ancientLimit-bc.txLookupLimit {
rawdb.WriteTxLookupEntriesByBlock(batch, block)
} else if rawdb.ReadTxIndexTail(bc.db) != nil {
rawdb.WriteTxLookupEntriesByBlock(batch, block)
}
stats.processed++

if batch.ValueSize() > ethdb.IdealBatchSize || i == len(blockChain)-1 {
size += int64(batch.ValueSize())
if err = batch.Write(); err != nil {
snapBlock := bc.CurrentSnapBlock().Number.Uint64()
if _, err := bc.db.TruncateHead(snapBlock + 1); err != nil {
log.Error("Can't truncate ancient store after failed insert", "err", err)
}
return 0, err
}
batch.Reset()
}
}
size += writeSize

// Sync the ancient store explicitly to ensure all data has been flushed to disk.
if err := bc.db.Sync(); err != nil {
Expand All @@ -1231,8 +1220,10 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
}

// Delete block data from the main database.
batch.Reset()
canonHashes := make(map[common.Hash]struct{})
var (
batch = bc.db.NewBatch()
canonHashes = make(map[common.Hash]struct{})
)
for _, block := range blockChain {
canonHashes[block.Hash()] = struct{}{}
if block.NumberU64() == 0 {
Expand All @@ -1250,13 +1241,16 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
if err := batch.Write(); err != nil {
return 0, err
}
stats.processed += int32(len(blockChain))
return 0, nil
}

// writeLive writes blockchain and corresponding receipt chain into active store.
writeLive := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
skipPresenceCheck := false
batch := bc.db.NewBatch()
var (
skipPresenceCheck = false
batch = bc.db.NewBatch()
)
for i, block := range blockChain {
// Short circuit insertion if shutting down or processing failed
if bc.insertStopped() {
Expand All @@ -1281,11 +1275,10 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
// Write all the data out into the database
rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body())
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i])
rawdb.WriteTxLookupEntriesByBlock(batch, block) // Always write tx indices for live blocks, we assume they are needed

// Write everything belongs to the blocks into the database. So that
// we can ensure all components of body is completed(body, receipts,
// tx indexes)
// we can ensure all components of body is completed(body, receipts)
// except transaction indexes(will be created once sync is finished).
if batch.ValueSize() >= ethdb.IdealBatchSize {
if err := batch.Write(); err != nil {
return 0, err
Expand Down Expand Up @@ -1317,19 +1310,6 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
return n, err
}
}
// Write the tx index tail (block number from where we index) before write any live blocks
if len(liveBlocks) > 0 && liveBlocks[0].NumberU64() == ancientLimit+1 {
// The tx index tail can only be one of the following two options:
// * 0: all ancient blocks have been indexed
// * ancient-limit: the indices of blocks before ancient-limit are ignored
if tail := rawdb.ReadTxIndexTail(bc.db); tail == nil {
if bc.txLookupLimit == 0 || ancientLimit <= bc.txLookupLimit {
rawdb.WriteTxIndexTail(bc.db, 0)
} else {
rawdb.WriteTxIndexTail(bc.db, ancientLimit-bc.txLookupLimit)
}
}
}
if len(liveBlocks) > 0 {
if n, err := writeLive(liveBlocks, liveReceipts); err != nil {
if err == errInsertionInterrupted {
Expand All @@ -1338,13 +1318,14 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
return n, err
}
}

head := blockChain[len(blockChain)-1]
context := []interface{}{
"count", stats.processed, "elapsed", common.PrettyDuration(time.Since(start)),
"number", head.Number(), "hash", head.Hash(), "age", common.PrettyAge(time.Unix(int64(head.Time()), 0)),
"size", common.StorageSize(size),
}
var (
head = blockChain[len(blockChain)-1]
context = []interface{}{
"count", stats.processed, "elapsed", common.PrettyDuration(time.Since(start)),
"number", head.Number(), "hash", head.Hash(), "age", common.PrettyAge(time.Unix(int64(head.Time()), 0)),
"size", common.StorageSize(size),
}
)
if stats.ignored > 0 {
context = append(context, []interface{}{"ignored", stats.ignored}...)
}
Expand All @@ -1360,7 +1341,6 @@ func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (e
if bc.insertStopped() {
return errInsertionInterrupted
}

batch := bc.db.NewBatch()
rawdb.WriteTd(batch, block.Hash(), block.NumberU64(), td)
rawdb.WriteBlock(batch, block)
Expand Down Expand Up @@ -2427,23 +2407,24 @@ func (bc *BlockChain) skipBlock(err error, it *insertIterator) bool {
func (bc *BlockChain) indexBlocks(tail *uint64, head uint64, done chan struct{}) {
defer func() { close(done) }()

// If head is 0, it means the chain is just initialized and no blocks are inserted,
// so don't need to indexing anything.
// If head is 0, it means the chain is just initialized and no blocks are
// inserted, so don't need to index anything.
if head == 0 {
return
}

// The tail flag is not existent, it means the node is just initialized
// and all blocks(may from ancient store) are not indexed yet.
// and all blocks in the chain (part of them may from ancient store) are
// not indexed yet, index the chain according to the configuration then.
if tail == nil {
from := uint64(0)
if bc.txLookupLimit != 0 && head >= bc.txLookupLimit {
from = head - bc.txLookupLimit + 1
}
rawdb.IndexTransactions(bc.db, from, head+1, bc.quit)
rawdb.IndexTransactions(bc.db, from, head+1, bc.quit, true)
return
}
// The tail flag is existent, but the whole chain is required to be indexed.
// The tail flag is existent (which means indexes in [tail, head] should be
// present), while the whole chain are requested for indexing.
if bc.txLookupLimit == 0 || head < bc.txLookupLimit {
if *tail > 0 {
// It can happen when chain is rewound to a historical point which
Expand All @@ -2453,17 +2434,58 @@ func (bc *BlockChain) indexBlocks(tail *uint64, head uint64, done chan struct{})
if end > head+1 {
end = head + 1
}
rawdb.IndexTransactions(bc.db, 0, end, bc.quit)
rawdb.IndexTransactions(bc.db, 0, end, bc.quit, true)
}
return
}
// Update the transaction index to the new chain state
// The tail flag is existent, adjust the index range according to configuration
// and latest head.
if head-bc.txLookupLimit+1 < *tail {
// Reindex a part of missing indices and rewind index tail to HEAD-limit
rawdb.IndexTransactions(bc.db, head-bc.txLookupLimit+1, *tail, bc.quit)
rawdb.IndexTransactions(bc.db, head-bc.txLookupLimit+1, *tail, bc.quit, true)
} else {
// Unindex a part of stale indices and forward index tail to HEAD-limit
rawdb.UnindexTransactions(bc.db, *tail, head-bc.txLookupLimit+1, bc.quit)
rawdb.UnindexTransactions(bc.db, *tail, head-bc.txLookupLimit+1, bc.quit, false)
}
}

// reportTxIndexProgress returns the tx indexing progress.
func (bc *BlockChain) reportTxIndexProgress(head uint64) TxIndexProgress {
var (
remaining uint64
tail = rawdb.ReadTxIndexTail(bc.db)
)
total := bc.txLookupLimit
if bc.txLookupLimit == 0 {
total = head + 1 // genesis included
}
var indexed uint64
if tail != nil {
indexed = head - *tail + 1
}
// The value of indexed might be larger than total if some blocks need
// to be unindexed, avoiding a negative remaining.
if indexed < total {
remaining = total - indexed
}
return TxIndexProgress{
Indexed: indexed,
Remaining: remaining,
}
}

// TxIndexProgress retrieves the tx indexing progress, or an error if the
// background tx indexer is not activated or already stopped.
func (bc *BlockChain) TxIndexProgress() (TxIndexProgress, error) {
if !bc.txIndexRunning {
return TxIndexProgress{}, errors.New("tx indexer is not activated")
}
ch := make(chan TxIndexProgress, 1)
select {
case bc.txIndexProgCh <- ch:
return <-ch, nil
case <-bc.quit:
return TxIndexProgress{}, errors.New("blockchain is closed")
}
}

Expand All @@ -2482,8 +2504,9 @@ func (bc *BlockChain) maintainTxIndex() {

// Listening to chain events and manipulate the transaction indexes.
var (
done chan struct{} // Non-nil if background unindexing or reindexing routine is active.
headCh = make(chan ChainHeadEvent, 1) // Buffered to avoid locking up the event feed
done chan struct{} // Non-nil if background unindexing or reindexing routine is active.
lastHead uint64 // The latest announced chain head (whose tx indexes are assumed created)
headCh = make(chan ChainHeadEvent, 1) // Buffered to avoid locking up the event feed
)
sub := bc.SubscribeChainHeadEvent(headCh)
if sub == nil {
Expand All @@ -2492,23 +2515,26 @@ func (bc *BlockChain) maintainTxIndex() {
defer sub.Unsubscribe()
log.Info("Initialized transaction indexer", "limit", bc.TxLookupLimit())

// Launch the initial processing if chain is not empty. This step is
// useful in these scenarios that chain has no progress and indexer
// is never triggered.
if head := rawdb.ReadHeadBlock(bc.db); head != nil {
// Launch the initial processing if chain is not empty (head != genesis).
// This step is useful in these scenarios that chain has no progress and
// indexer is never triggered.
if head := rawdb.ReadHeadBlock(bc.db); head != nil && head.Number().Uint64() != 0 {
done = make(chan struct{})
lastHead = head.Number().Uint64()
go bc.indexBlocks(rawdb.ReadTxIndexTail(bc.db), head.NumberU64(), done)
}

for {
select {
case head := <-headCh:
if done == nil {
done = make(chan struct{})
go bc.indexBlocks(rawdb.ReadTxIndexTail(bc.db), head.Block.NumberU64(), done)
}
lastHead = head.Block.NumberU64()
case <-done:
done = nil
case ch := <-bc.txIndexProgCh:
ch <- bc.reportTxIndexProgress(lastHead)
case <-bc.quit:
if done != nil {
log.Info("Waiting background transaction indexer to exit")
Expand Down

0 comments on commit aa814bd

Please sign in to comment.