From 7a684ebfb089e2f65fb9352244882231a47542a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 24 May 2023 11:20:09 +0300 Subject: [PATCH] consensus, core, eth/downloader: add 4844 excessDataGas validations --- consensus/beacon/consensus.go | 10 +- consensus/clique/clique.go | 2 +- consensus/ethash/consensus.go | 2 +- consensus/misc/eip1559.go | 4 +- consensus/misc/eip1559_test.go | 2 +- consensus/misc/eip4844.go | 55 +++++++ consensus/misc/eip4844_test.go | 38 +++++ core/block_validator.go | 32 +++- core/blockchain_insert.go | 6 +- core/blockchain_test.go | 2 +- core/types.go | 5 +- core/types/block.go | 13 +- eth/downloader/downloader.go | 28 ++-- eth/downloader/downloader_test.go | 4 +- eth/downloader/fetchers_concurrent_bodies.go | 11 +- .../fetchers_concurrent_receipts.go | 10 +- eth/downloader/queue.go | 149 ++++++++++-------- eth/downloader/queue_test.go | 31 ++-- eth/downloader/resultstore.go | 8 +- 19 files changed, 289 insertions(+), 123 deletions(-) diff --git a/consensus/beacon/consensus.go b/consensus/beacon/consensus.go index 75f1b65efa304..6288175c38cba 100644 --- a/consensus/beacon/consensus.go +++ b/consensus/beacon/consensus.go @@ -257,7 +257,7 @@ func (beacon *Beacon) verifyHeader(chain consensus.ChainHeaderReader, header, pa return consensus.ErrInvalidNumber } // Verify the header's EIP-1559 attributes. - if err := misc.VerifyEip1559Header(chain.Config(), parent, header); err != nil { + if err := misc.VerifyEIP1559Header(chain.Config(), parent, header); err != nil { return err } // Verify existence / non-existence of withdrawalsHash. @@ -270,12 +270,14 @@ func (beacon *Beacon) verifyHeader(chain consensus.ChainHeaderReader, header, pa } // Verify the existence / non-existence of excessDataGas cancun := chain.Config().IsCancun(header.Number, header.Time) - if cancun && header.ExcessDataGas == nil { - return errors.New("missing excessDataGas") - } if !cancun && header.ExcessDataGas != nil { return fmt.Errorf("invalid excessDataGas: have %d, expected nil", header.ExcessDataGas) } + if cancun { + if err := misc.VerifyEIP4844Header(parent, header); err != nil { + return nil + } + } return nil } diff --git a/consensus/clique/clique.go b/consensus/clique/clique.go index 01d27482da2ec..df9f86d7db60c 100644 --- a/consensus/clique/clique.go +++ b/consensus/clique/clique.go @@ -343,7 +343,7 @@ func (c *Clique) verifyCascadingFields(chain consensus.ChainHeaderReader, header if err := misc.VerifyGaslimit(parent.GasLimit, header.GasLimit); err != nil { return err } - } else if err := misc.VerifyEip1559Header(chain.Config(), parent, header); err != nil { + } else if err := misc.VerifyEIP1559Header(chain.Config(), parent, header); err != nil { // Verify the header's EIP-1559 attributes. return err } diff --git a/consensus/ethash/consensus.go b/consensus/ethash/consensus.go index b36637c5b9b9a..28e872910b678 100644 --- a/consensus/ethash/consensus.go +++ b/consensus/ethash/consensus.go @@ -254,7 +254,7 @@ func (ethash *Ethash) verifyHeader(chain consensus.ChainHeaderReader, header, pa if err := misc.VerifyGaslimit(parent.GasLimit, header.GasLimit); err != nil { return err } - } else if err := misc.VerifyEip1559Header(chain.Config(), parent, header); err != nil { + } else if err := misc.VerifyEIP1559Header(chain.Config(), parent, header); err != nil { // Verify the header's EIP-1559 attributes. return err } diff --git a/consensus/misc/eip1559.go b/consensus/misc/eip1559.go index 4521b47b36e6b..18ae8d9a8c136 100644 --- a/consensus/misc/eip1559.go +++ b/consensus/misc/eip1559.go @@ -26,10 +26,10 @@ import ( "github.com/ethereum/go-ethereum/params" ) -// VerifyEip1559Header verifies some header attributes which were changed in EIP-1559, +// VerifyEIP1559Header verifies some header attributes which were changed in EIP-1559, // - gas limit check // - basefee check -func VerifyEip1559Header(config *params.ChainConfig, parent, header *types.Header) error { +func VerifyEIP1559Header(config *params.ChainConfig, parent, header *types.Header) error { // Verify that the gas limit remains within allowed bounds parentGasLimit := parent.GasLimit if !config.IsLondon(parent.Number) { diff --git a/consensus/misc/eip1559_test.go b/consensus/misc/eip1559_test.go index 1a9f96bc43262..59fded2cde709 100644 --- a/consensus/misc/eip1559_test.go +++ b/consensus/misc/eip1559_test.go @@ -95,7 +95,7 @@ func TestBlockGasLimits(t *testing.T) { BaseFee: initial, Number: big.NewInt(tc.pNum + 1), } - err := VerifyEip1559Header(config(), parent, header) + err := VerifyEIP1559Header(config(), parent, header) if tc.ok && err != nil { t.Errorf("test %d: Expected valid header: %s", i, err) } diff --git a/consensus/misc/eip4844.go b/consensus/misc/eip4844.go index 66ca9bd26d864..7bbc69471e216 100644 --- a/consensus/misc/eip4844.go +++ b/consensus/misc/eip4844.go @@ -17,8 +17,11 @@ package misc import ( + "errors" + "fmt" "math/big" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/params" ) @@ -27,7 +30,59 @@ var ( dataGaspriceUpdateFraction = big.NewInt(params.BlobTxDataGaspriceUpdateFraction) ) +// VerifyEIP4844Header verifies the presence of the excessDataGas field and that +// if the current block contains no transactions, the excessDataGas is updated +// accordingly. +// +// We cannot verify excessDataGas if there *are* transactions included as that +// would require the block body. However, it is nonetheless useful to verify the +// header in case of an empty body since certain code might skip body validations +// with no included transactions (e.g. snap sync). +func VerifyEIP4844Header(parent, header *types.Header) error { + // Verify the header is not malformed + if header.ExcessDataGas == nil { + return errors.New("header is missing excessDataGas") + } + // Verify the excessDataGas is correct based on the parent header iff the + // transaction list is empty. For non-empty blocks, validation needs to be + // done later. + if header.TxHash == types.EmptyTxsHash { + expectedExcessDataGas := CalcExcessDataGas(parent.ExcessDataGas, 0) + if header.ExcessDataGas.Cmp(expectedExcessDataGas) != 0 { + return fmt.Errorf("invalid excessDataGas: have %s, want %s, parentExcessDataGas %s, blob txs %d", + header.ExcessDataGas, expectedExcessDataGas, parent.ExcessDataGas, 0) + } + } + return nil +} + +// CalcExcessDataGas calculates the excess data gas after applying the set of +// blobs on top of the paren't excess data gas. +// +// Note, the excessDataGas is akin to gasUsed, in that it's calculated post- +// execution of the blob transactions not before. Hence, the blob fee used to +// pay for the blobs are actually derived from the parent data gas. +func CalcExcessDataGas(parentExcessDataGas *big.Int, blobs int) *big.Int { + excessDataGas := new(big.Int) + if parentExcessDataGas != nil { + excessDataGas.Set(parentExcessDataGas) + } + consumed := big.NewInt(params.BlobTxDataGasPerBlob) + consumed.Mul(consumed, big.NewInt(int64(blobs))) + excessDataGas.Add(excessDataGas, consumed) + + targetGas := big.NewInt(params.BlobTxTargetDataGasPerBlock) + if excessDataGas.Cmp(targetGas) < 0 { + return new(big.Int) + } + return new(big.Int).Sub(excessDataGas, targetGas) +} + // CalcBlobFee calculates the blobfee from the header's excess data gas field. +// +// Note, the blob fee used to pay for blob transactions should be derived from +// the parent block's excess data gas, since it is a post-execution field akin +// to gas used. func CalcBlobFee(excessDataGas *big.Int) *big.Int { // If this block does not yet have EIP-4844 enabled, return the starting fee if excessDataGas == nil { diff --git a/consensus/misc/eip4844_test.go b/consensus/misc/eip4844_test.go index 5838cab8e669d..f7e23df7c3183 100644 --- a/consensus/misc/eip4844_test.go +++ b/consensus/misc/eip4844_test.go @@ -24,6 +24,44 @@ import ( "github.com/ethereum/go-ethereum/params" ) +func TestCalcExcessDataGas(t *testing.T) { + var tests = []struct { + parent int64 + blobs int + want int64 + }{ + // The excess data gas should not increase from zero if the used blob + // slots are below - or equal - to the target. + {0, 0, 0}, + {0, 1, 0}, + {0, params.BlobTxTargetDataGasPerBlock / params.BlobTxDataGasPerBlob, 0}, + + // If the target data gas is exceeded, the excessDataGas should increase + // by however much it was overshot + {0, (params.BlobTxTargetDataGasPerBlock / params.BlobTxDataGasPerBlob) + 1, params.BlobTxDataGasPerBlob}, + {1, (params.BlobTxTargetDataGasPerBlock / params.BlobTxDataGasPerBlob) + 1, params.BlobTxDataGasPerBlob + 1}, + {1, (params.BlobTxTargetDataGasPerBlock / params.BlobTxDataGasPerBlob) + 2, 2*params.BlobTxDataGasPerBlob + 1}, + + // The excess data gas should decrease by however much the target was + // under-shot, capped at zero. + {params.BlobTxTargetDataGasPerBlock, params.BlobTxTargetDataGasPerBlock / params.BlobTxDataGasPerBlob, params.BlobTxTargetDataGasPerBlock}, + {params.BlobTxTargetDataGasPerBlock, (params.BlobTxTargetDataGasPerBlock / params.BlobTxDataGasPerBlob) - 1, params.BlobTxDataGasPerBlob}, + {params.BlobTxTargetDataGasPerBlock, (params.BlobTxTargetDataGasPerBlock / params.BlobTxDataGasPerBlob) - 2, 0}, + {params.BlobTxDataGasPerBlob - 1, (params.BlobTxTargetDataGasPerBlock / params.BlobTxDataGasPerBlob) - 1, 0}, + } + for _, tt := range tests { + result := CalcExcessDataGas(big.NewInt(tt.parent), tt.blobs) + if result.Int64() != tt.want { + t.Errorf("excess data gas mismatch: have %v, want %v", result, tt.want) + } + } + // Test nil value for parent + result := CalcExcessDataGas(nil, (params.BlobTxTargetDataGasPerBlock/params.BlobTxDataGasPerBlob)+1) + if result.Int64() != params.BlobTxDataGasPerBlob { + t.Errorf("nil parent excess data gas mismatch: have %v, want %v", result, params.BlobTxDataGasPerBlob) + } +} + func TestCalcBlobFee(t *testing.T) { tests := []struct { excessDataGas int64 diff --git a/core/block_validator.go b/core/block_validator.go index bcb228830d4ea..cc1d613019715 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -20,6 +20,7 @@ import ( "fmt" "github.com/ethereum/go-ethereum/consensus" + "github.com/ethereum/go-ethereum/consensus/misc" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/params" @@ -49,12 +50,16 @@ func NewBlockValidator(config *params.ChainConfig, blockchain *BlockChain, engin // ValidateBody validates the given block's uncles and verifies the block // header's transaction and uncle roots. The headers are assumed to be already // validated at this point. -func (v *BlockValidator) ValidateBody(block *types.Block) error { +// +// Note, the parent's presence necessity was introduced in Cancun where the +// blob excess data gas field is the current block depends on the value in +// the parent block but needs the transaction list too to validate. If the +// parent is nil but needed, the validator will attept to load it from disk. +func (v *BlockValidator) ValidateBody(parent *types.Header, block *types.Block) error { // Check whether the block is already imported. if v.bc.HasBlockAndState(block.Hash(), block.NumberU64()) { return ErrKnownBlock } - // Header validity is known at this point. Here we verify that uncles, transactions // and withdrawals given in the block body match the header. header := block.Header() @@ -77,10 +82,29 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error { return fmt.Errorf("withdrawals root hash mismatch (header value %x, calculated %x)", *header.WithdrawalsHash, hash) } } else if block.Withdrawals() != nil { - // Withdrawals are not allowed prior to shanghai fork + // Withdrawals are not allowed prior to Shanghai fork return fmt.Errorf("withdrawals present in block body") } - + // Blob transactions may be present after the Cancun fork, validate against + // the excessDataGas field. Note, the presence of the field itself must have + // already been validated when verifying the header, we only check its value. + if header.ExcessDataGas != nil { + var blobs int + for _, tx := range block.Transactions() { + blobs += len(tx.BlobHashes()) + } + if blobs > params.BlobTxMaxDataGasPerBlock/params.BlobTxDataGasPerBlob { + fmt.Errorf("exceeded block capacity (permitted %d, included %d)", params.BlobTxMaxDataGasPerBlock/params.BlobTxDataGasPerBlob, blobs) + } + if parent == nil { + if parent = v.bc.GetHeader(block.ParentHash(), block.NumberU64()-1); parent == nil { + return consensus.ErrUnknownAncestor + } + } + if excessDataGas := misc.CalcExcessDataGas(parent.ExcessDataGas, blobs); header.ExcessDataGas.Cmp(excessDataGas) != 0 { + return fmt.Errorf("excess data gas mismatch (header value %v, calculated %v)", header.ExcessDataGas, excessDataGas) + } + } if !v.bc.HasBlockAndState(block.ParentHash(), block.NumberU64()-1) { if !v.bc.HasBlock(block.ParentHash(), block.NumberU64()-1) { return consensus.ErrUnknownAncestor diff --git a/core/blockchain_insert.go b/core/blockchain_insert.go index 8f496e182c9e4..a89d8ece5ad64 100644 --- a/core/blockchain_insert.go +++ b/core/blockchain_insert.go @@ -121,7 +121,11 @@ func (it *insertIterator) next() (*types.Block, error) { return it.chain[it.index], it.errors[it.index] } // Block header valid, run body validation and return - return it.chain[it.index], it.validator.ValidateBody(it.chain[it.index]) + var parent *types.Header + if it.index > 0 { + parent = it.chain[it.index-1].Header() + } + return it.chain[it.index], it.validator.ValidateBody(parent, it.chain[it.index]) } // peek returns the next block in the iterator, along with any potential validation diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 5ec685c99ff45..97d09bfe04ba2 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -150,7 +150,7 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error { // Try and process the block err := blockchain.engine.VerifyHeader(blockchain, block.Header()) if err == nil { - err = blockchain.validator.ValidateBody(block) + err = blockchain.validator.ValidateBody(nil, block) } if err != nil { if err == ErrKnownBlock { diff --git a/core/types.go b/core/types.go index 36eb0d1dedbed..ab94efc93f213 100644 --- a/core/types.go +++ b/core/types.go @@ -28,8 +28,9 @@ import ( // is only responsible for validating block contents, as the header validation is // done by the specific consensus engines. type Validator interface { - // ValidateBody validates the given block's content. - ValidateBody(block *types.Block) error + // ValidateBody validates the given block's content. If the parent is nil, + // but it is required, the validator should look it up on chain. + ValidateBody(parent *types.Header, block *types.Block) error // ValidateState validates the given statedb and optionally the receipts and // gas used. diff --git a/core/types/block.go b/core/types/block.go index a1a14f5b16579..87f7a866eb66f 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -152,10 +152,10 @@ func (h *Header) SanityCheck() error { // EmptyBody returns true if there is no additional 'body' to complete the header // that is: no transactions, no uncles and no withdrawals. func (h *Header) EmptyBody() bool { - if h.WithdrawalsHash == nil { - return h.TxHash == EmptyTxsHash && h.UncleHash == EmptyUncleHash + if h.WithdrawalsHash != nil { + return h.TxHash == EmptyTxsHash && *h.WithdrawalsHash == EmptyWithdrawalsHash } - return h.TxHash == EmptyTxsHash && h.UncleHash == EmptyUncleHash && *h.WithdrawalsHash == EmptyWithdrawalsHash + return h.TxHash == EmptyTxsHash && h.UncleHash == EmptyUncleHash } // EmptyReceipts returns true if there are no receipts for this header/block. @@ -353,6 +353,13 @@ func (b *Block) Withdrawals() Withdrawals { return b.withdrawals } +func (b *Block) ExcessDataGas() *big.Int { + if b.header.ExcessDataGas == nil { + return nil + } + return new(big.Int).Set(b.header.ExcessDataGas) +} + func (b *Block) Header() *Header { return CopyHeader(b.header) } // Body returns the non-header content of the block. diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 9a805396c4d3b..ad7fe5a7f162a 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -147,10 +147,10 @@ type Downloader struct { quitLock sync.Mutex // Lock to prevent double closes // Testing hooks - syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run - bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch - receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch - chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations) + syncInitHook func(uint64, uint64) // Method to call upon initiating a new sync run + bodyFetchHook func([]*fetchTask) // Method to call upon starting a block body fetch + receiptFetchHook func([]*fetchTask) // Method to call upon starting a receipt fetch + chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations) // Progress reporting metrics syncStartBlock uint64 // Head snap block when Geth was started @@ -166,6 +166,9 @@ type LightChain interface { // GetHeaderByHash retrieves a header from the local chain. GetHeaderByHash(common.Hash) *types.Header + // GetHeaderByNumber retrieves a header from the local canonical chain. + GetHeaderByNumber(uint64) *types.Header + // CurrentHeader retrieves the head header from the local chain. CurrentHeader() *types.Header @@ -614,6 +617,11 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * } } } + parent := d.lightchain.GetHeaderByNumber(origin) + if parent == nil { + log.Error("Failed to retrieve just found common ancestor") + return fmt.Errorf("missing common ancestor #%d", origin) + } // Initiate the sync using a concurrent header and content retrieval algorithm d.queue.Prepare(origin+1, mode) if d.syncInitHook != nil { @@ -631,7 +639,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * headerFetcher, // Headers are always retrieved func() error { return d.fetchBodies(origin+1, beaconMode) }, // Bodies are retrieved during normal and snap sync func() error { return d.fetchReceipts(origin+1, beaconMode) }, // Receipts are retrieved during snap sync - func() error { return d.processHeaders(origin+1, td, ttd, beaconMode) }, + func() error { return d.processHeaders(parent, td, ttd, beaconMode) }, } if mode == SnapSync { d.pivotLock.Lock() @@ -1272,7 +1280,7 @@ func (d *Downloader) fetchReceipts(from uint64, beaconMode bool) error { // processHeaders takes batches of retrieved headers from an input channel and // keeps processing and scheduling them into the header chain and downloader's // queue until the stream ends or a failure occurs. -func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode bool) error { +func (d *Downloader) processHeaders(parent *types.Header, td, ttd *big.Int, beaconMode bool) error { // Keep a count of uncertain headers to roll back var ( rollback uint64 // Zero means no rollback (fine as you can't unroll the genesis) @@ -1454,20 +1462,20 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode } } // Otherwise insert the headers for content retrieval - inserts := d.queue.Schedule(chunkHeaders, chunkHashes, origin) + inserts := d.queue.Schedule(parent, chunkHeaders, chunkHashes) if len(inserts) != len(chunkHeaders) { rollbackErr = fmt.Errorf("stale headers: len inserts %v len(chunk) %v", len(inserts), len(chunkHeaders)) return fmt.Errorf("%w: stale headers", errBadPeer) } } + parent = headers[limit-1] headers = headers[limit:] hashes = hashes[limit:] - origin += uint64(limit) } // Update the highest block number we know if a higher one is found. d.syncStatsLock.Lock() - if d.syncStatsChainHeight < origin { - d.syncStatsChainHeight = origin - 1 + if d.syncStatsChainHeight <= parent.Number.Uint64() { + d.syncStatsChainHeight = parent.Number.Uint64() } d.syncStatsLock.Unlock() diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 06c22afff092c..aa6709a814707 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -787,10 +787,10 @@ func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) { // Instrument the downloader to signal body requests var bodiesHave, receiptsHave atomic.Int32 - tester.downloader.bodyFetchHook = func(headers []*types.Header) { + tester.downloader.bodyFetchHook = func(headers []*fetchTask) { bodiesHave.Add(int32(len(headers))) } - tester.downloader.receiptFetchHook = func(headers []*types.Header) { + tester.downloader.receiptFetchHook = func(headers []*fetchTask) { receiptsHave.Add(int32(len(headers))) } // Synchronise with the peer and make sure all blocks were retrieved diff --git a/eth/downloader/fetchers_concurrent_bodies.go b/eth/downloader/fetchers_concurrent_bodies.go index 9440972c6d70c..c47c70e1acb51 100644 --- a/eth/downloader/fetchers_concurrent_bodies.go +++ b/eth/downloader/fetchers_concurrent_bodies.go @@ -74,14 +74,13 @@ func (q *bodyQueue) unreserve(peer string) int { // request is responsible for converting a generic fetch request into a body // one and sending it to the remote peer for fulfillment. func (q *bodyQueue) request(peer *peerConnection, req *fetchRequest, resCh chan *eth.Response) (*eth.Request, error) { - peer.log.Trace("Requesting new batch of bodies", "count", len(req.Headers), "from", req.Headers[0].Number) + peer.log.Trace("Requesting new batch of bodies", "count", len(req.Tasks), "from", req.Tasks[0].header.Number) if q.bodyFetchHook != nil { - q.bodyFetchHook(req.Headers) + q.bodyFetchHook(req.Tasks) } - - hashes := make([]common.Hash, 0, len(req.Headers)) - for _, header := range req.Headers { - hashes = append(hashes, header.Hash()) + hashes := make([]common.Hash, 0, len(req.Tasks)) + for _, task := range req.Tasks { + hashes = append(hashes, task.header.Hash()) } return peer.peer.RequestBodies(hashes, resCh) } diff --git a/eth/downloader/fetchers_concurrent_receipts.go b/eth/downloader/fetchers_concurrent_receipts.go index 1c853c2184432..2e41b99f873af 100644 --- a/eth/downloader/fetchers_concurrent_receipts.go +++ b/eth/downloader/fetchers_concurrent_receipts.go @@ -74,13 +74,13 @@ func (q *receiptQueue) unreserve(peer string) int { // request is responsible for converting a generic fetch request into a receipt // one and sending it to the remote peer for fulfillment. func (q *receiptQueue) request(peer *peerConnection, req *fetchRequest, resCh chan *eth.Response) (*eth.Request, error) { - peer.log.Trace("Requesting new batch of receipts", "count", len(req.Headers), "from", req.Headers[0].Number) + peer.log.Trace("Requesting new batch of receipts", "count", len(req.Tasks), "from", req.Tasks[0].header.Number) if q.receiptFetchHook != nil { - q.receiptFetchHook(req.Headers) + q.receiptFetchHook(req.Tasks) } - hashes := make([]common.Hash, 0, len(req.Headers)) - for _, header := range req.Headers { - hashes = append(hashes, header.Hash()) + hashes := make([]common.Hash, 0, len(req.Tasks)) + for _, task := range req.Tasks { + hashes = append(hashes, task.header.Hash()) } return peer.peer.RequestReceipts(hashes, resCh) } diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index e9907297a0b93..065829f14f52f 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -28,9 +28,11 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/prque" + "github.com/ethereum/go-ethereum/consensus/misc" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/params" ) const ( @@ -50,12 +52,19 @@ var ( errStaleDelivery = errors.New("stale delivery") ) +// fetchTask defines a sinle header to download, but since it's result validation +// depends on its parent too after Cancun, the task bundles the parent in too. +type fetchTask struct { + parent *types.Header // Parent of the header to fill to validate with + header *types.Header // Header to fill via the fetch task +} + // fetchRequest is a currently running data retrieval operation. type fetchRequest struct { - Peer *peerConnection // Peer to which the request was sent - From uint64 // Requested chain element index (used for skeleton fills only) - Headers []*types.Header // Requested headers, sorted by request order - Time time.Time // Time when the request was made + Peer *peerConnection // Peer to which the request was sent + From uint64 // Requested chain element index (used for skeleton fills only) + Tasks []*fetchTask // Requested headers, sorted by request order + Time time.Time // Time when the request was made } // fetchResult is a struct collecting partial results from data fetchers until @@ -127,15 +136,15 @@ type queue struct { headerContCh chan bool // Channel to notify when header download finishes // All data retrievals below are based on an already assembles header chain - blockTaskPool map[common.Hash]*types.Header // Pending block (body) retrieval tasks, mapping hashes to headers - blockTaskQueue *prque.Prque[int64, *types.Header] // Priority queue of the headers to fetch the blocks (bodies) for - blockPendPool map[string]*fetchRequest // Currently pending block (body) retrieval operations - blockWakeCh chan bool // Channel to notify the block fetcher of new tasks + blockTaskPool map[common.Hash]*fetchTask // Pending block (body) retrieval tasks, mapping hashes to headers + blockTaskQueue *prque.Prque[int64, *fetchTask] // Priority queue of the headers to fetch the blocks (bodies) for + blockPendPool map[string]*fetchRequest // Currently pending block (body) retrieval operations + blockWakeCh chan bool // Channel to notify the block fetcher of new tasks - receiptTaskPool map[common.Hash]*types.Header // Pending receipt retrieval tasks, mapping hashes to headers - receiptTaskQueue *prque.Prque[int64, *types.Header] // Priority queue of the headers to fetch the receipts for - receiptPendPool map[string]*fetchRequest // Currently pending receipt retrieval operations - receiptWakeCh chan bool // Channel to notify when receipt fetcher of new tasks + receiptTaskPool map[common.Hash]*fetchTask // Pending receipt retrieval tasks, mapping hashes to headers + receiptTaskQueue *prque.Prque[int64, *fetchTask] // Priority queue of the headers to fetch the receipts for + receiptPendPool map[string]*fetchRequest // Currently pending receipt retrieval operations + receiptWakeCh chan bool // Channel to notify when receipt fetcher of new tasks resultCache *resultStore // Downloaded but not yet delivered fetch results resultSize common.StorageSize // Approximate size of a block (exponential moving average) @@ -152,9 +161,9 @@ func newQueue(blockCacheLimit int, thresholdInitialSize int) *queue { lock := new(sync.RWMutex) q := &queue{ headerContCh: make(chan bool, 1), - blockTaskQueue: prque.New[int64, *types.Header](nil), + blockTaskQueue: prque.New[int64, *fetchTask](nil), blockWakeCh: make(chan bool, 1), - receiptTaskQueue: prque.New[int64, *types.Header](nil), + receiptTaskQueue: prque.New[int64, *fetchTask](nil), receiptWakeCh: make(chan bool, 1), active: sync.NewCond(lock), lock: lock, @@ -174,11 +183,11 @@ func (q *queue) Reset(blockCacheLimit int, thresholdInitialSize int) { q.headerHead = common.Hash{} q.headerPendPool = make(map[string]*fetchRequest) - q.blockTaskPool = make(map[common.Hash]*types.Header) + q.blockTaskPool = make(map[common.Hash]*fetchTask) q.blockTaskQueue.Reset() q.blockPendPool = make(map[string]*fetchRequest) - q.receiptTaskPool = make(map[common.Hash]*types.Header) + q.receiptTaskPool = make(map[common.Hash]*fetchTask) q.receiptTaskQueue.Reset() q.receiptPendPool = make(map[string]*fetchRequest) @@ -290,7 +299,7 @@ func (q *queue) RetrieveHeaders() ([]*types.Header, []common.Hash, int) { // Schedule adds a set of headers for the download queue for scheduling, returning // the new headers encountered. -func (q *queue) Schedule(headers []*types.Header, hashes []common.Hash, from uint64) []*types.Header { +func (q *queue) Schedule(parent *types.Header, headers []*types.Header, hashes []common.Hash) []*types.Header { q.lock.Lock() defer q.lock.Unlock() @@ -299,8 +308,8 @@ func (q *queue) Schedule(headers []*types.Header, hashes []common.Hash, from uin for i, header := range headers { // Make sure chain order is honoured and preserved throughout hash := hashes[i] - if header.Number == nil || header.Number.Uint64() != from { - log.Warn("Header broke chain ordering", "number", header.Number, "hash", hash, "expected", from) + if header.Number == nil || header.Number.Uint64() != parent.Number.Uint64()+1 { + log.Warn("Header broke chain ordering", "number", header.Number, "hash", hash, "expected", parent.Number) break } if q.headerHead != (common.Hash{}) && q.headerHead != header.ParentHash { @@ -313,21 +322,25 @@ func (q *queue) Schedule(headers []*types.Header, hashes []common.Hash, from uin if _, ok := q.blockTaskPool[hash]; ok { log.Warn("Header already scheduled for block fetch", "number", header.Number, "hash", hash) } else { - q.blockTaskPool[hash] = header - q.blockTaskQueue.Push(header, -int64(header.Number.Uint64())) + task := &fetchTask{parent: parent, header: header} + + q.blockTaskPool[hash] = task + q.blockTaskQueue.Push(task, -int64(task.header.Number.Uint64())) } // Queue for receipt retrieval if q.mode == SnapSync && !header.EmptyReceipts() { if _, ok := q.receiptTaskPool[hash]; ok { log.Warn("Header already scheduled for receipt fetch", "number", header.Number, "hash", hash) } else { - q.receiptTaskPool[hash] = header - q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64())) + task := &fetchTask{parent: parent, header: header} + + q.receiptTaskPool[hash] = task + q.receiptTaskQueue.Push(task, -int64(task.header.Number.Uint64())) } } inserts = append(inserts, header) q.headerHead = hash - from++ + parent = header } return inserts } @@ -488,7 +501,7 @@ func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bo // item - the fetchRequest // progress - whether any progress was made // throttle - if the caller should throttle for a while -func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque[int64, *types.Header], +func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*fetchTask, taskQueue *prque.Prque[int64, *fetchTask], pendPool map[string]*fetchRequest, kind uint) (*fetchRequest, bool, bool) { // Short circuit if the pool has been depleted, or if the peer's already // downloading something (sanity check not to corrupt state) @@ -499,27 +512,27 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common return nil, false, false } // Retrieve a batch of tasks, skipping previously failed ones - send := make([]*types.Header, 0, count) - skip := make([]*types.Header, 0) + send := make([]*fetchTask, 0, count) + skip := make([]*fetchTask, 0) progress := false throttled := false for proc := 0; len(send) < count && !taskQueue.Empty(); proc++ { // the task queue will pop items in order, so the highest prio block // is also the lowest block number. - header, _ := taskQueue.Peek() + task, _ := taskQueue.Peek() // we can ask the resultcache if this header is within the // "prioritized" segment of blocks. If it is not, we need to throttle - stale, throttle, item, err := q.resultCache.AddFetch(header, q.mode == SnapSync) + stale, throttle, item, err := q.resultCache.AddFetch(task, q.mode == SnapSync) if stale { // Don't put back in the task queue, this item has already been // delivered upstream taskQueue.PopItem() progress = true - delete(taskPool, header.Hash()) + delete(taskPool, task.header.Hash()) proc = proc - 1 - log.Error("Fetch reservation already delivered", "number", header.Number.Uint64()) + log.Error("Fetch reservation already delivered", "number", task.header.Number.Uint64()) continue } if throttle { @@ -538,7 +551,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common } if item.Done(kind) { // If it's a noop, we can skip this task - delete(taskPool, header.Hash()) + delete(taskPool, task.header.Hash()) taskQueue.PopItem() proc = proc - 1 progress = true @@ -547,15 +560,15 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common // Remove it from the task queue taskQueue.PopItem() // Otherwise unless the peer is known not to have the data, add to the retrieve list - if p.Lacks(header.Hash()) { - skip = append(skip, header) + if p.Lacks(task.header.Hash()) { + skip = append(skip, task) } else { - send = append(send, header) + send = append(send, task) } } // Merge all the skipped headers back - for _, header := range skip { - taskQueue.Push(header, -int64(header.Number.Uint64())) + for _, task := range skip { + taskQueue.Push(task, -int64(task.header.Number.Uint64())) } if q.resultCache.HasCompletedItems() { // Wake Results, resultCache was modified @@ -566,9 +579,9 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common return nil, progress, throttled } request := &fetchRequest{ - Peer: p, - Headers: send, - Time: time.Now(), + Peer: p, + Tasks: send, + Time: time.Now(), } pendPool[p.id] = request return request, progress, throttled @@ -586,14 +599,14 @@ func (q *queue) Revoke(peerID string) { delete(q.headerPendPool, peerID) } if request, ok := q.blockPendPool[peerID]; ok { - for _, header := range request.Headers { - q.blockTaskQueue.Push(header, -int64(header.Number.Uint64())) + for _, task := range request.Tasks { + q.blockTaskQueue.Push(task, -int64(task.header.Number.Uint64())) } delete(q.blockPendPool, peerID) } if request, ok := q.receiptPendPool[peerID]; ok { - for _, header := range request.Headers { - q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64())) + for _, task := range request.Tasks { + q.receiptTaskQueue.Push(task, -int64(task.header.Number.Uint64())) } delete(q.receiptPendPool, peerID) } @@ -651,10 +664,10 @@ func (q *queue) expire(peer string, pendPool map[string]*fetchRequest, taskQueue if req.From > 0 { taskQueue.(*prque.Prque[int64, uint64]).Push(req.From, -int64(req.From)) } - for _, header := range req.Headers { - taskQueue.(*prque.Prque[int64, *types.Header]).Push(header, -int64(header.Number.Uint64())) + for _, task := range req.Tasks { + taskQueue.(*prque.Prque[int64, *fetchTask]).Push(task, -int64(task.header.Number.Uint64())) } - return len(req.Headers) + return len(req.Tasks) } // DeliverHeaders injects a header retrieval response into the header results @@ -776,7 +789,7 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, txListH q.lock.Lock() defer q.lock.Unlock() - validate := func(index int, header *types.Header) error { + validate := func(index int, parent, header *types.Header) error { if txListHashes[index] != header.TxHash { return errInvalidBody } @@ -796,6 +809,18 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, txListH return errInvalidBody } } + if header.ExcessDataGas != nil { + var blobs int + for _, tx := range txLists[index] { + blobs += len(tx.BlobHashes()) + } + if blobs > params.BlobTxMaxDataGasPerBlock/params.BlobTxDataGasPerBlob { + return errInvalidBody + } + if excessDataGas := misc.CalcExcessDataGas(parent.ExcessDataGas, blobs); header.ExcessDataGas.Cmp(excessDataGas) != 0 { + return errInvalidBody + } + } return nil } @@ -816,7 +841,7 @@ func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt, recei q.lock.Lock() defer q.lock.Unlock() - validate := func(index int, header *types.Header) error { + validate := func(index int, parent, header *types.Header) error { if receiptListHashes[index] != header.ReceiptHash { return errInvalidReceipt } @@ -835,10 +860,10 @@ func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt, recei // Note, this method expects the queue lock to be already held for writing. The // reason this lock is not obtained in here is because the parameters already need // to access the queue, so they already need a lock anyway. -func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, - taskQueue *prque.Prque[int64, *types.Header], pendPool map[string]*fetchRequest, +func (q *queue) deliver(id string, taskPool map[common.Hash]*fetchTask, + taskQueue *prque.Prque[int64, *fetchTask], pendPool map[string]*fetchRequest, reqTimer metrics.Timer, resInMeter metrics.Meter, resDropMeter metrics.Meter, - results int, validate func(index int, header *types.Header) error, + results int, validate func(index int, parent, header *types.Header) error, reconstruct func(index int, result *fetchResult)) (int, error) { // Short circuit if the data was never requested request := pendPool[id] @@ -853,8 +878,8 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, // If no data items were retrieved, mark them as unavailable for the origin peer if results == 0 { - for _, header := range request.Headers { - request.Peer.MarkLacking(header.Hash()) + for _, task := range request.Tasks { + request.Peer.MarkLacking(task.header.Hash()) } } // Assemble each of the results with their headers and retrieved data parts @@ -864,28 +889,28 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, i int hashes []common.Hash ) - for _, header := range request.Headers { + for _, task := range request.Tasks { // Short circuit assembly if no more fetch results are found if i >= results { break } // Validate the fields - if err := validate(i, header); err != nil { + if err := validate(i, task.parent, task.header); err != nil { failure = err break } - hashes = append(hashes, header.Hash()) + hashes = append(hashes, task.header.Hash()) i++ } - for _, header := range request.Headers[:i] { - if res, stale, err := q.resultCache.GetDeliverySlot(header.Number.Uint64()); err == nil && !stale { + for _, task := range request.Tasks[:i] { + if res, stale, err := q.resultCache.GetDeliverySlot(task.header.Number.Uint64()); err == nil && !stale { reconstruct(accepted, res) } else { // else: between here and above, some other peer filled this result, // or it was indeed a no-op. This should not happen, but if it does it's // not something to panic about - log.Error("Delivery stale", "stale", stale, "number", header.Number.Uint64(), "err", err) + log.Error("Delivery stale", "stale", stale, "number", task.header.Number.Uint64(), "err", err) failure = errStaleDelivery } // Clean up a successful fetch @@ -895,8 +920,8 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, resDropMeter.Mark(int64(results - accepted)) // Return all failed or missing fetches to the queue - for _, header := range request.Headers[accepted:] { - taskQueue.Push(header, -int64(header.Number.Uint64())) + for _, task := range request.Tasks[accepted:] { + taskQueue.Push(task, -int64(task.header.Number.Uint64())) } // Wake up Results if accepted > 0 { diff --git a/eth/downloader/queue_test.go b/eth/downloader/queue_test.go index a8b1b45e006af..e3a546a592fca 100644 --- a/eth/downloader/queue_test.go +++ b/eth/downloader/queue_test.go @@ -110,7 +110,7 @@ func TestBasics(t *testing.T) { for i, header := range headers { hashes[i] = header.Hash() } - q.Schedule(headers, hashes, 1) + q.Schedule(testGenesis.Header(), headers, hashes) if q.Idle() { t.Errorf("queue should not be idle") } @@ -131,10 +131,10 @@ func TestBasics(t *testing.T) { t.Fatal("should throttle") } // But we should still get the first things to fetch - if got, exp := len(fetchReq.Headers), 5; got != exp { + if got, exp := len(fetchReq.Tasks), 5; got != exp { t.Fatalf("expected %d requests, got %d", exp, got) } - if got, exp := fetchReq.Headers[0].Number.Uint64(), uint64(1); got != exp { + if got, exp := fetchReq.Tasks[0].header.Number.Uint64(), uint64(1); got != exp { t.Fatalf("expected header %d, got %d", exp, got) } } @@ -154,7 +154,7 @@ func TestBasics(t *testing.T) { } // And not get any fetches at all, since it was throttled to begin with if fetchReq != nil { - t.Fatalf("should have no fetches, got %d", len(fetchReq.Headers)) + t.Fatalf("should have no fetches, got %d", len(fetchReq.Tasks)) } } if exp, got := q.blockTaskQueue.Size(), numOfBlocks-10; exp != got { @@ -173,10 +173,10 @@ func TestBasics(t *testing.T) { t.Fatal("should throttle") } // But we should still get the first things to fetch - if got, exp := len(fetchReq.Headers), 5; got != exp { + if got, exp := len(fetchReq.Tasks), 5; got != exp { t.Fatalf("expected %d requests, got %d", exp, got) } - if got, exp := fetchReq.Headers[0].Number.Uint64(), uint64(1); got != exp { + if got, exp := fetchReq.Tasks[0].header.Number.Uint64(), uint64(1); got != exp { t.Fatalf("expected header %d, got %d", exp, got) } } @@ -204,7 +204,7 @@ func TestEmptyBlocks(t *testing.T) { for i, header := range headers { hashes[i] = header.Hash() } - q.Schedule(headers, hashes, 1) + q.Schedule(testGenesis.Header(), headers, hashes) if q.Idle() { t.Errorf("queue should not be idle") } @@ -289,9 +289,14 @@ func XTestDelivery(t *testing.T) { hashes[i] = header.Hash() } l := len(headers) + + var parent = testGenesis.Header() + if c > 1 { + parent = world.headers(c - 1)[0] + } //fmt.Printf("scheduling %d headers, first %d last %d\n", // l, headers[0].Number.Uint64(), headers[len(headers)-1].Number.Uint64()) - q.Schedule(headers, hashes, uint64(c)) + q.Schedule(parent, headers, hashes) c += l } }() @@ -322,9 +327,9 @@ func XTestDelivery(t *testing.T) { txset [][]*types.Transaction uncleset [][]*types.Header ) - numToSkip := rand.Intn(len(f.Headers)) - for _, hdr := range f.Headers[0 : len(f.Headers)-numToSkip] { - txset = append(txset, world.getTransactions(hdr.Number.Uint64())) + numToSkip := rand.Intn(len(f.Tasks)) + for _, task := range f.Tasks[0 : len(f.Tasks)-numToSkip] { + txset = append(txset, world.getTransactions(task.header.Number.Uint64())) uncleset = append(uncleset, emptyList) } var ( @@ -357,8 +362,8 @@ func XTestDelivery(t *testing.T) { f, _, _ := q.ReserveReceipts(peer, rand.Intn(50)) if f != nil { var rcs [][]*types.Receipt - for _, hdr := range f.Headers { - rcs = append(rcs, world.getReceipts(hdr.Number.Uint64())) + for _, task := range f.Tasks { + rcs = append(rcs, world.getReceipts(task.header.Number.Uint64())) } hasher := trie.NewStackTrie(nil) hashes := make([]common.Hash, len(rcs)) diff --git a/eth/downloader/resultstore.go b/eth/downloader/resultstore.go index 7f7f5a89e264f..153d06195e233 100644 --- a/eth/downloader/resultstore.go +++ b/eth/downloader/resultstore.go @@ -20,8 +20,6 @@ import ( "fmt" "sync" "sync/atomic" - - "github.com/ethereum/go-ethereum/core/types" ) // resultStore implements a structure for maintaining fetchResults, tracking their @@ -76,17 +74,17 @@ func (r *resultStore) SetThrottleThreshold(threshold uint64) uint64 { // throttled - if true, the store is at capacity, this particular header is not prio now // item - the result to store data into // err - any error that occurred -func (r *resultStore) AddFetch(header *types.Header, fastSync bool) (stale, throttled bool, item *fetchResult, err error) { +func (r *resultStore) AddFetch(task *fetchTask, fastSync bool) (stale, throttled bool, item *fetchResult, err error) { r.lock.Lock() defer r.lock.Unlock() var index int - item, index, stale, throttled, err = r.getFetchResult(header.Number.Uint64()) + item, index, stale, throttled, err = r.getFetchResult(task.header.Number.Uint64()) if err != nil || stale || throttled { return stale, throttled, item, err } if item == nil { - item = newFetchResult(header, fastSync) + item = newFetchResult(task.header, fastSync) r.items[index] = item } return stale, throttled, item, err