Skip to content

Commit

Permalink
Merge upstream PR ethereum#28171
Browse files Browse the repository at this point in the history
core, accounts, eth, trie: handle genesis state missing (ethereum#28171)

 Conflicts:
  core/blockchain.go
We made many changes in setHeadBeyondRoot. upstream removed assumption
that genesis state is always in the db. Accepted both changes
  • Loading branch information
tsahee committed Jan 25, 2024
2 parents 02ecac8 + 73f5bcb commit ed35443
Show file tree
Hide file tree
Showing 28 changed files with 260 additions and 295 deletions.
17 changes: 8 additions & 9 deletions accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ func (b *SimulatedBackend) CodeAt(ctx context.Context, contract common.Address,
if err != nil {
return nil, err
}

return stateDB.GetCode(contract), nil
}

Expand All @@ -212,7 +211,6 @@ func (b *SimulatedBackend) BalanceAt(ctx context.Context, contract common.Addres
if err != nil {
return nil, err
}

return stateDB.GetBalance(contract), nil
}

Expand All @@ -225,7 +223,6 @@ func (b *SimulatedBackend) NonceAt(ctx context.Context, contract common.Address,
if err != nil {
return 0, err
}

return stateDB.GetNonce(contract), nil
}

Expand All @@ -238,7 +235,6 @@ func (b *SimulatedBackend) StorageAt(ctx context.Context, contract common.Addres
if err != nil {
return nil, err
}

val := stateDB.GetState(contract, key)
return val[:], nil
}
Expand Down Expand Up @@ -700,8 +696,10 @@ func (b *SimulatedBackend) SendTransaction(ctx context.Context, tx *types.Transa
}
block.AddTxWithChain(b.blockchain, tx)
})
stateDB, _ := b.blockchain.State()

stateDB, err := b.blockchain.State()
if err != nil {
return err
}
b.pendingBlock = blocks[0]
b.pendingState, _ = state.New(b.pendingBlock.Root(), stateDB.Database(), nil)
b.pendingReceipts = receipts[0]
Expand Down Expand Up @@ -821,11 +819,12 @@ func (b *SimulatedBackend) AdjustTime(adjustment time.Duration) error {
blocks, _ := core.GenerateChain(b.config, block, ethash.NewFaker(), b.database, 1, func(number int, block *core.BlockGen) {
block.OffsetTime(int64(adjustment.Seconds()))
})
stateDB, _ := b.blockchain.State()

stateDB, err := b.blockchain.State()
if err != nil {
return err
}
b.pendingBlock = blocks[0]
b.pendingState, _ = state.New(b.pendingBlock.Root(), stateDB.Database(), nil)

return nil
}

Expand Down
1 change: 1 addition & 0 deletions cmd/devp2p/internal/ethtest/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func setupGeth(stack *node.Node) error {
if err != nil {
return err
}
backend.SetSynced()

_, err = backend.BlockChain().InsertChain(chain.blocks[1:])
return err
Expand Down
54 changes: 19 additions & 35 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,17 +376,17 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
if err := bc.loadLastState(); err != nil {
return nil, err
}
// Make sure the state associated with the block is available
// Make sure the state associated with the block is available, or log out
// if there is no available state, waiting for state sync.
head := bc.CurrentBlock()
if !bc.HasState(head.Root) {
if head.Number.Uint64() <= bc.genesisBlock.NumberU64() {
// The genesis state is missing, which is only possible in the path-based
// scheme. This situation occurs when the state syncer overwrites it.
//
// The solution is to reset the state to the genesis state. Although it may not
// match the sync target, the state healer will later address and correct any
// inconsistencies.
bc.resetState()
// scheme. This situation occurs when the initial state sync is not finished
// yet, or the chain head is rewound below the pivot point. In both scenario,
// there is no possible recovery approach except for rerunning a snap sync.
// Do nothing here until the state syncer picks it up.
log.Info("Genesis state is missing, wait state sync")
} else {
// Head state is missing, before the state recovery, find out the
// disk layer point of snapshot(if it's enabled). Make sure the
Expand Down Expand Up @@ -672,28 +672,6 @@ func (bc *BlockChain) SetSafe(header *types.Header) {
}
}

// resetState resets the persistent state to genesis state if it's not present.
func (bc *BlockChain) resetState() {
// Short circuit if the genesis state is already present.
root := bc.genesisBlock.Root()
if bc.HasState(root) {
return
}
// Reset the state database to empty for committing genesis state.
// Note, it should only happen in path-based scheme and Reset function
// is also only call-able in this mode.
if bc.triedb.Scheme() == rawdb.PathScheme {
if err := bc.triedb.Reset(types.EmptyRootHash); err != nil {
log.Crit("Failed to clean state", "err", err) // Shouldn't happen
}
}
// Write genesis state into database.
if err := CommitGenesisState(bc.db, bc.triedb, bc.genesisBlock.Hash()); err != nil {
log.Crit("Failed to commit genesis state", "err", err)
}
log.Info("Reset state to genesis", "root", root)
}

// setHeadBeyondRoot rewinds the local chain to a new head with the extra condition
// that the rewind must pass the specified state root. The extra condition is
// ignored if it causes rolling back more than rewindLimit Gas (0 meaning infinte).
Expand Down Expand Up @@ -732,7 +710,6 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
if newHeadBlock == nil {
log.Error("Gap in the chain, rewinding to genesis", "number", header.Number, "hash", header.Hash())
newHeadBlock = bc.genesisBlock
bc.resetState()
} else {
// Block exists, keep rewinding until we find one with state,
// keeping rewinding until we exceed the optional threshold
Expand Down Expand Up @@ -784,16 +761,14 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
lastFullBlockHash = newHeadBlock.Hash()
}
if rootFound || newHeadBlock.NumberU64() <= bc.genesisBlock.NumberU64() {
if newHeadBlock.NumberU64() <= bc.genesisBlock.NumberU64() {
bc.resetState()
} else if !bc.HasState(newHeadBlock.Root()) {
if !bc.HasState(newHeadBlock.Root()) && bc.stateRecoverable(newHeadBlock.Root()) {
// Rewind to a block with recoverable state. If the state is
// missing, run the state recovery here.
if err := bc.triedb.Recover(newHeadBlock.Root()); err != nil {
log.Crit("Failed to rollback state", "err", err) // Shouldn't happen
}
log.Debug("Rewound to block with state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
}
log.Debug("Rewound to block with state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash())
break
}
log.Debug("Skipping block with threshold state", "number", newHeadBlock.NumberU64(), "hash", newHeadBlock.Hash(), "root", newHeadBlock.Root())
Expand All @@ -808,6 +783,15 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
// to low, so it's safe to update in-memory markers directly.
bc.currentBlock.Store(newHeadBlock.Header())
headBlockGauge.Update(int64(newHeadBlock.NumberU64()))

// The head state is missing, which is only possible in the path-based
// scheme. This situation occurs when the chain head is rewound below
// the pivot point. In this scenario, there is no possible recovery
// approach except for rerunning a snap sync. Do nothing here until the
// state syncer picks it up.
if !bc.HasState(newHeadBlock.Root()) {
log.Info("Chain is stateless, wait state sync", "number", newHeadBlock.Number(), "hash", newHeadBlock.Hash())
}
}
// Rewind the snap block in a simpleton way to the target head
if currentSnapBlock := bc.CurrentSnapBlock(); currentSnapBlock != nil && header.Number.Uint64() < currentSnapBlock.Number.Uint64() {
Expand Down Expand Up @@ -908,7 +892,7 @@ func (bc *BlockChain) SnapSyncCommitHead(hash common.Hash) error {
// Reset the trie database with the fresh snap synced state.
root := block.Root()
if bc.triedb.Scheme() == rawdb.PathScheme {
if err := bc.triedb.Reset(root); err != nil {
if err := bc.triedb.Enable(root); err != nil {
return err
}
}
Expand Down
22 changes: 22 additions & 0 deletions core/rawdb/accessors_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,25 @@ func DeleteSkeletonHeader(db ethdb.KeyValueWriter, number uint64) {
log.Crit("Failed to delete skeleton header", "err", err)
}
}

const (
StateSyncUnknown = uint8(0) // flags the state snap sync is unknown
StateSyncRunning = uint8(1) // flags the state snap sync is not completed yet
StateSyncFinished = uint8(2) // flags the state snap sync is completed
)

// ReadSnapSyncStatusFlag retrieves the state snap sync status flag.
func ReadSnapSyncStatusFlag(db ethdb.KeyValueReader) uint8 {
blob, err := db.Get(snapSyncStatusFlagKey)
if err != nil || len(blob) != 1 {
return StateSyncUnknown
}
return blob[0]
}

// WriteSnapSyncStatusFlag stores the state snap sync status flag into database.
func WriteSnapSyncStatusFlag(db ethdb.KeyValueWriter, flag uint8) {
if err := db.Put(snapSyncStatusFlagKey, []byte{flag}); err != nil {
log.Crit("Failed to store sync status flag", "err", err)
}
}
2 changes: 1 addition & 1 deletion core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
lastPivotKey, fastTrieProgressKey, snapshotDisabledKey, SnapshotRootKey, snapshotJournalKey,
snapshotGeneratorKey, snapshotRecoveryKey, txIndexTailKey, fastTxLookupLimitKey,
uncleanShutdownKey, badBlockKey, transitionStatusKey, skeletonSyncStatusKey,
persistentStateIDKey, trieJournalKey, snapshotSyncStatusKey,
persistentStateIDKey, trieJournalKey, snapshotSyncStatusKey, snapSyncStatusFlagKey,
} {
if bytes.Equal(key, meta) {
metadata.Add(size)
Expand Down
3 changes: 3 additions & 0 deletions core/rawdb/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ var (
// transitionStatusKey tracks the eth2 transition status.
transitionStatusKey = []byte("eth2-transition")

// snapSyncStatusFlagKey flags that status of snap sync.
snapSyncStatusFlagKey = []byte("SnapSyncStatus")

// Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes).
headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header
headerTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td
Expand Down
6 changes: 6 additions & 0 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,13 @@ func (p *BlobPool) Init(gasTip *big.Int, head *types.Header, reserve txpool.Addr
return err
}
}
// Initialize the state with head block, or fallback to empty one in
// case the head state is not available(might occur when node is not
// fully synced).
state, err := p.chain.StateAt(head.Root)
if err != nil {
state, err = p.chain.StateAt(types.EmptyRootHash)
}
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion core/txpool/blobpool/evictheap.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type evictHeap struct {
index map[common.Address]int // Indices into the heap for replacements
}

// newPriceHeap creates a new heap of cheapets accounts in the blob pool to evict
// newPriceHeap creates a new heap of cheapest accounts in the blob pool to evict
// from in case of over saturation.
func newPriceHeap(basefee *uint256.Int, blobfee *uint256.Int, index *map[common.Address][]*blobTxMeta) *evictHeap {
heap := &evictHeap{
Expand Down
4 changes: 2 additions & 2 deletions core/txpool/blobpool/limbo.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (l *limbo) push(tx *types.Transaction, block uint64) error {
return errors.New("already tracked blob transaction")
}
if err := l.setAndIndex(tx, block); err != nil {
log.Error("Failed to set and index liboed blobs", "tx", tx, "err", err)
log.Error("Failed to set and index limboed blobs", "tx", tx, "err", err)
return err
}
return nil
Expand Down Expand Up @@ -191,7 +191,7 @@ func (l *limbo) update(txhash common.Hash, block uint64) {
log.Trace("Blob transaction unchanged in limbo", "tx", txhash, "block", block)
return
}
// Retrieve the old blobs from the data store and write tehm back with a new
// Retrieve the old blobs from the data store and write them back with a new
// block number. IF anything fails, there's not much to do, go on.
item, err := l.getAndDrop(id)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion core/txpool/blobpool/priority.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
var log2_1_125 = math.Log2(1.125)

// evictionPriority calculates the eviction priority based on the algorithm
// described in the BlobPool docs for a both fee components.
// described in the BlobPool docs for both fee components.
//
// This method takes about 8ns on a very recent laptop CPU, recalculating about
// 125 million transaction priority values per second.
Expand Down
2 changes: 1 addition & 1 deletion core/txpool/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,6 @@ var (
ErrOversizedData = errors.New("oversized data")

// ErrFutureReplacePending is returned if a future transaction replaces a pending
// transaction. Future transactions should only be able to replace other future transactions.
// one. Future transactions should only be able to replace other future transactions.
ErrFutureReplacePending = errors.New("future transaction tries to replace pending")
)
23 changes: 18 additions & 5 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,20 @@ func (pool *LegacyPool) Init(gasTip *big.Int, head *types.Header, reserve txpool

// Set the basic pool parameters
pool.gasTip.Store(gasTip)
pool.reset(nil, head)

// Initialize the state with head block, or fallback to empty one in
// case the head state is not available(might occur when node is not
// fully synced).
statedb, err := pool.chain.StateAt(head.Root)
if err != nil {
statedb, err = pool.chain.StateAt(types.EmptyRootHash)
}
if err != nil {
return err
}
pool.currentHead.Store(head)
pool.currentState = statedb
pool.pendingNonces = newNoncer(statedb)

// Start the reorg loop early, so it can handle requests generated during
// journal loading.
Expand Down Expand Up @@ -406,7 +419,7 @@ func (pool *LegacyPool) Close() error {
}

// Reset implements txpool.SubPool, allowing the legacy pool's internal state to be
// kept in sync with the main transacion pool's internal state.
// kept in sync with the main transaction pool's internal state.
func (pool *LegacyPool) Reset(oldHead, newHead *types.Header) {
wait := pool.requestReset(oldHead, newHead)
<-wait
Expand Down Expand Up @@ -637,7 +650,7 @@ func (pool *LegacyPool) validateTx(tx *types.Transaction, local bool) error {
// pending or queued one, it overwrites the previous transaction if its price is higher.
//
// If a newly added transaction is marked as local, its sending account will be
// be added to the allowlist, preventing any associated transaction from being dropped
// added to the allowlist, preventing any associated transaction from being dropped
// out of the pool due to pricing constraints.
func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, err error) {
// If the transaction is already known, discard it
Expand Down Expand Up @@ -901,7 +914,7 @@ func (pool *LegacyPool) promoteTx(addr common.Address, hash common.Hash, tx *typ
}

// addLocals enqueues a batch of transactions into the pool if they are valid, marking the
// senders as a local ones, ensuring they go around the local pricing constraints.
// senders as local ones, ensuring they go around the local pricing constraints.
//
// This method is used to add transactions from the RPC API and performs synchronous pool
// reorganization and event propagation.
Expand Down Expand Up @@ -943,7 +956,7 @@ func (pool *LegacyPool) addRemoteSync(tx *types.Transaction) error {
}

// Add enqueues a batch of transactions into the pool if they are valid. Depending
// on the local flag, full pricing contraints will or will not be applied.
// on the local flag, full pricing constraints will or will not be applied.
//
// If sync is set, the method will block until all internal maintenance related
// to the add is finished. Only use this during tests for determinism!
Expand Down
4 changes: 2 additions & 2 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type TxPool struct {
reservations map[common.Address]SubPool // Map with the account to pool reservations
reserveLock sync.Mutex // Lock protecting the account reservations

subs event.SubscriptionScope // Subscription scope to unscubscribe all on shutdown
subs event.SubscriptionScope // Subscription scope to unsubscribe all on shutdown
quit chan chan error // Quit channel to tear down the head updater
}

Expand Down Expand Up @@ -404,7 +404,7 @@ func (p *TxPool) Locals() []common.Address {
}

// Status returns the known status (unknown/pending/queued) of a transaction
// identified by their hashes.
// identified by its hash.
func (p *TxPool) Status(hash common.Hash) TxStatus {
for _, subpool := range p.subpools {
if status := subpool.Status(hash); status != TxStatusUnknown {
Expand Down
6 changes: 3 additions & 3 deletions core/txpool/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func ValidateTransaction(tx *types.Transaction, head *types.Header, signer types
if sidecar == nil {
return fmt.Errorf("missing sidecar in blob transaction")
}
// Ensure the number of items in the blob transaction and vairous side
// Ensure the number of items in the blob transaction and various side
// data match up before doing any expensive validations
hashes := tx.BlobHashes()
if len(hashes) == 0 {
Expand Down Expand Up @@ -183,7 +183,7 @@ type ValidationOptionsWithState struct {
// be rejected once the number of remaining slots reaches zero.
UsedAndLeftSlots func(addr common.Address) (int, int)

// ExistingExpenditure is a mandatory callback to retrieve the cummulative
// ExistingExpenditure is a mandatory callback to retrieve the cumulative
// cost of the already pooled transactions to check for overdrafts.
ExistingExpenditure func(addr common.Address) *big.Int

Expand Down Expand Up @@ -238,7 +238,7 @@ func ValidateTransactionWithState(tx *types.Transaction, signer types.Signer, op
return fmt.Errorf("%w: balance %v, queued cost %v, tx cost %v, overshot %v", core.ErrInsufficientFunds, balance, spent, cost, new(big.Int).Sub(need, balance))
}
// Transaction takes a new nonce value out of the pool. Ensure it doesn't
// overflow the number of permitted transactions from a single accoun
// overflow the number of permitted transactions from a single account
// (i.e. max cancellable via out-of-bound transaction).
if used, left := opts.UsedAndLeftSlots(from); left <= 0 {
return fmt.Errorf("%w: pooled %d txs", ErrAccountLimitExceeded, used)
Expand Down

0 comments on commit ed35443

Please sign in to comment.