Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/downloader: remove header rollback mechanism #28147

Merged
merged 2 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
55 changes: 2 additions & 53 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1280,41 +1280,13 @@ func (d *Downloader) fetchReceipts(from uint64, beaconMode bool) error {
// keeps processing and scheduling them into the header chain and downloader's
// queue until the stream ends or a failure occurs.
func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode bool) error {
// Keep a count of uncertain headers to roll back
var (
rollback uint64 // Zero means no rollback (fine as you can't unroll the genesis)
rollbackErr error
mode = d.getMode()
mode = d.getMode()
gotHeaders = false // Wait for batches of headers to process
)
defer func() {
if rollback > 0 {
lastHeader, lastFastBlock, lastBlock := d.lightchain.CurrentHeader().Number, common.Big0, common.Big0
if mode != LightSync {
lastFastBlock = d.blockchain.CurrentSnapBlock().Number
lastBlock = d.blockchain.CurrentBlock().Number
}
if err := d.lightchain.SetHead(rollback - 1); err != nil { // -1 to target the parent of the first uncertain block
// We're already unwinding the stack, only print the error to make it more visible
log.Error("Failed to roll back chain segment", "head", rollback-1, "err", err)
}
curFastBlock, curBlock := common.Big0, common.Big0
if mode != LightSync {
curFastBlock = d.blockchain.CurrentSnapBlock().Number
curBlock = d.blockchain.CurrentBlock().Number
}
log.Warn("Rolled back chain segment",
"header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number),
"snap", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock),
"block", fmt.Sprintf("%d->%d", lastBlock, curBlock), "reason", rollbackErr)
}
}()
// Wait for batches of headers to process
gotHeaders := false

for {
select {
case <-d.cancelCh:
rollbackErr = errCanceled
return errCanceled

case task := <-d.headerProcCh:
Expand Down Expand Up @@ -1363,8 +1335,6 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
}
}
}
// Disable any rollback and return
rollback = 0
return nil
}
// Otherwise split the chunk of headers into batches and process them
Expand All @@ -1375,7 +1345,6 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
// Terminate if something failed in between processing chunks
select {
case <-d.cancelCh:
rollbackErr = errCanceled
return errCanceled
default:
}
Expand Down Expand Up @@ -1422,29 +1391,11 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
}
if len(chunkHeaders) > 0 {
if n, err := d.lightchain.InsertHeaderChain(chunkHeaders); err != nil {
rollbackErr = err

// If some headers were inserted, track them as uncertain
if mode == SnapSync && n > 0 && rollback == 0 {
rollback = chunkHeaders[0].Number.Uint64()
}
log.Warn("Invalid header encountered", "number", chunkHeaders[n].Number, "hash", chunkHashes[n], "parent", chunkHeaders[n].ParentHash, "err", err)
return fmt.Errorf("%w: %v", errInvalidChain, err)
}
// All verifications passed, track all headers within the allowed limits
if mode == SnapSync {
head := chunkHeaders[len(chunkHeaders)-1].Number.Uint64()
if head-rollback > uint64(fsHeaderSafetyNet) {
rollback = head - uint64(fsHeaderSafetyNet)
} else {
rollback = 1
}
}
}
if len(rejected) != 0 {
// Merge threshold reached, stop importing, but don't roll back
rollback = 0

log.Info("Legacy sync reached merge threshold", "number", rejected[0].Number, "hash", rejected[0].Hash(), "td", td, "ttd", ttd)
return ErrMergeTransition
}
Expand All @@ -1455,15 +1406,13 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
for d.queue.PendingBodies() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
select {
case <-d.cancelCh:
rollbackErr = errCanceled
return errCanceled
case <-time.After(time.Second):
}
}
// Otherwise insert the headers for content retrieval
inserts := d.queue.Schedule(chunkHeaders, chunkHashes, origin)
if len(inserts) != len(chunkHeaders) {
rollbackErr = fmt.Errorf("stale headers: len inserts %v len(chunk) %v", len(inserts), len(chunkHeaders))
return fmt.Errorf("%w: stale headers", errBadPeer)
}
}
Expand Down
80 changes: 0 additions & 80 deletions eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,86 +878,6 @@ func testShiftedHeaderAttack(t *testing.T, protocol uint, mode SyncMode) {
assertOwnChain(t, tester, len(chain.blocks))
}

// Tests that upon detecting an invalid header, the recent ones are rolled back
// for various failure scenarios. Afterwards a full sync is attempted to make
// sure no state was corrupted.
func TestInvalidHeaderRollback66Snap(t *testing.T) { testInvalidHeaderRollback(t, eth.ETH66, SnapSync) }
func TestInvalidHeaderRollback67Snap(t *testing.T) { testInvalidHeaderRollback(t, eth.ETH67, SnapSync) }

func testInvalidHeaderRollback(t *testing.T, protocol uint, mode SyncMode) {
tester := newTester(t)
defer tester.terminate()

// Create a small enough block chain to download
targetBlocks := 3*fsHeaderSafetyNet + 256 + fsMinFullBlocks
chain := testChainBase.shorten(targetBlocks)

// Attempt to sync with an attacker that feeds junk during the fast sync phase.
// This should result in the last fsHeaderSafetyNet headers being rolled back.
missing := fsHeaderSafetyNet + MaxHeaderFetch + 1

fastAttacker := tester.newPeer("fast-attack", protocol, chain.blocks[1:])
fastAttacker.withholdHeaders[chain.blocks[missing].Hash()] = struct{}{}

if err := tester.sync("fast-attack", nil, mode); err == nil {
t.Fatalf("succeeded fast attacker synchronisation")
}
if head := tester.chain.CurrentHeader().Number.Int64(); int(head) > MaxHeaderFetch {
t.Errorf("rollback head mismatch: have %v, want at most %v", head, MaxHeaderFetch)
}
// Attempt to sync with an attacker that feeds junk during the block import phase.
// This should result in both the last fsHeaderSafetyNet number of headers being
// rolled back, and also the pivot point being reverted to a non-block status.
missing = 3*fsHeaderSafetyNet + MaxHeaderFetch + 1

blockAttacker := tester.newPeer("block-attack", protocol, chain.blocks[1:])
fastAttacker.withholdHeaders[chain.blocks[missing].Hash()] = struct{}{} // Make sure the fast-attacker doesn't fill in
blockAttacker.withholdHeaders[chain.blocks[missing].Hash()] = struct{}{}

if err := tester.sync("block-attack", nil, mode); err == nil {
t.Fatalf("succeeded block attacker synchronisation")
}
if head := tester.chain.CurrentHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch {
t.Errorf("rollback head mismatch: have %v, want at most %v", head, 2*fsHeaderSafetyNet+MaxHeaderFetch)
}
if mode == SnapSync {
if head := tester.chain.CurrentBlock().Number.Uint64(); head != 0 {
t.Errorf("fast sync pivot block #%d not rolled back", head)
}
}
// Attempt to sync with an attacker that withholds promised blocks after the
// fast sync pivot point. This could be a trial to leave the node with a bad
// but already imported pivot block.
withholdAttacker := tester.newPeer("withhold-attack", protocol, chain.blocks[1:])

tester.downloader.syncInitHook = func(uint64, uint64) {
for i := missing; i < len(chain.blocks); i++ {
withholdAttacker.withholdHeaders[chain.blocks[i].Hash()] = struct{}{}
}
tester.downloader.syncInitHook = nil
}
if err := tester.sync("withhold-attack", nil, mode); err == nil {
t.Fatalf("succeeded withholding attacker synchronisation")
}
if head := tester.chain.CurrentHeader().Number.Int64(); int(head) > 2*fsHeaderSafetyNet+MaxHeaderFetch {
t.Errorf("rollback head mismatch: have %v, want at most %v", head, 2*fsHeaderSafetyNet+MaxHeaderFetch)
}
if mode == SnapSync {
if head := tester.chain.CurrentBlock().Number.Uint64(); head != 0 {
t.Errorf("fast sync pivot block #%d not rolled back", head)
}
}
// Synchronise with the valid peer and make sure sync succeeds. Since the last rollback
// should also disable fast syncing for this process, verify that we did a fresh full
// sync. Note, we can't assert anything about the receipts since we won't purge the
// database of them, hence we can't use assertOwnChain.
tester.newPeer("valid", protocol, chain.blocks[1:])
if err := tester.sync("valid", nil, mode); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err)
}
assertOwnChain(t, tester, len(chain.blocks))
}

// Tests that a peer advertising a high TD doesn't get to stall the downloader
// afterwards by not sending any useful hashes.
func TestHighTDStarvationAttack66Full(t *testing.T) {
Expand Down