Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/fetcher: throttle tx fetches to 128KB responses #28304

Merged
merged 2 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
82 changes: 50 additions & 32 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,22 @@ const (
// can announce in a short time.
maxTxAnnounces = 4096

// maxTxRetrievals is the maximum transaction number can be fetched in one
// request. The rationale to pick 256 is:
// - In eth protocol, the softResponseLimit is 2MB. Nowadays according to
// Etherscan the average transaction size is around 200B, so in theory
// we can include lots of transaction in a single protocol packet.
// - However the maximum size of a single transaction is raised to 128KB,
// so pick a middle value here to ensure we can maximize the efficiency
// of the retrieval and response size overflow won't happen in most cases.
// maxTxRetrievals is the maximum number of transactions that can be fetched
// in one request. The rationale for picking 256 is to have a reasonabe lower
// bound for the transferred data (don't waste RTTs, transfer more meaningful
// batch sizes), but also have an upper bound on the sequentiality to allow
// using our entire peerset for deliveries.
//
// This number also acts as a failsafe against malicious announces which might
// cause us to request more data than we'd expect.
maxTxRetrievals = 256

// maxTxRetrievalSize is the max number of bytes that delivered transactions
// should weigh according to the announcements. The 128KB was chosen to limit
// retrieving a maximum of one blob transaction at a time to minimize hogging
// a connection between two peers.
maxTxRetrievalSize = 128 * 1024

// maxTxUnderpricedSetSize is the size of the underpriced transaction set that
// is used to track recent transactions that have been dropped so we don't
// re-request them.
Expand Down Expand Up @@ -859,25 +865,36 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{},
if len(f.announces[peer]) == 0 {
return // continue in the for-each
}
hashes := make([]common.Hash, 0, maxTxRetrievals)
f.forEachHash(f.announces[peer], func(hash common.Hash) bool {
if _, ok := f.fetching[hash]; !ok {
// Mark the hash as fetching and stash away possible alternates
f.fetching[hash] = peer

if _, ok := f.alternates[hash]; ok {
panic(fmt.Sprintf("alternate tracker already contains fetching item: %v", f.alternates[hash]))
}
f.alternates[hash] = f.announced[hash]
delete(f.announced, hash)
var (
hashes = make([]common.Hash, 0, maxTxRetrievals)
bytes uint64
)
f.forEachAnnounce(f.announces[peer], func(hash common.Hash, meta *txMetadata) bool {
karalabe marked this conversation as resolved.
Show resolved Hide resolved
// If the transaction is alcear fetching, skip to the next one
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alcear 👀

if _, ok := f.fetching[hash]; ok {
return true
}
// Mark the hash as fetching and stash away possible alternates
f.fetching[hash] = peer

// Accumulate the hash and stop if the limit was reached
hashes = append(hashes, hash)
if len(hashes) >= maxTxRetrievals {
return false // break in the for-each
if _, ok := f.alternates[hash]; ok {
panic(fmt.Sprintf("alternate tracker already contains fetching item: %v", f.alternates[hash]))
}
f.alternates[hash] = f.announced[hash]
delete(f.announced, hash)

// Accumulate the hash and stop if the limit was reached
hashes = append(hashes, hash)
if len(hashes) >= maxTxRetrievals {
return false // break in the for-each
}
if meta != nil { // Only set eth/68 and upwards
bytes += uint64(meta.size)
if bytes >= maxTxRetrievalSize {
return false
}
}
return true // continue in the for-each
return true // scheduled, try to add more
})
// If any hashes were allocated, request them from the peer
if len(hashes) > 0 {
Expand Down Expand Up @@ -922,27 +939,28 @@ func (f *TxFetcher) forEachPeer(peers map[string]struct{}, do func(peer string))
}
}

// forEachHash does a range loop over a map of hashes in production, but during
// testing it does a deterministic sorted random to allow reproducing issues.
func (f *TxFetcher) forEachHash(hashes map[common.Hash]*txMetadata, do func(hash common.Hash) bool) {
// forEachAnnounce does a range loop over a map of announcements in production,
// but during testing it does a deterministic sorted random to allow reproducing
// issues.
func (f *TxFetcher) forEachAnnounce(announces map[common.Hash]*txMetadata, do func(hash common.Hash, meta *txMetadata) bool) {
// If we're running production, use whatever Go's map gives us
if f.rand == nil {
for hash := range hashes {
if !do(hash) {
for hash, meta := range announces {
if !do(hash, meta) {
return
}
}
return
}
// We're running the test suite, make iteration deterministic
list := make([]common.Hash, 0, len(hashes))
for hash := range hashes {
list := make([]common.Hash, 0, len(announces))
for hash := range announces {
list = append(list, hash)
}
sortHashes(list)
rotateHashes(list, f.rand.Intn(len(list)))
for _, hash := range list {
if !do(hash) {
if !do(hash, announces[hash]) {
return
}
}
Expand Down
72 changes: 67 additions & 5 deletions eth/fetcher/tx_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/params"
)

var (
Expand Down Expand Up @@ -993,15 +994,14 @@ func TestTransactionFetcherTimeoutTimerResets(t *testing.T) {
})
}

// Tests that if thousands of transactions are announces, only a small
// Tests that if thousands of transactions are announced, only a small
// number of them will be requested at a time.
func TestTransactionFetcherRateLimiting(t *testing.T) {
// Create a slew of transactions and to announce them
// Create a slew of transactions and announce them
var hashes []common.Hash
for i := 0; i < maxTxAnnounces; i++ {
hashes = append(hashes, common.Hash{byte(i / 256), byte(i % 256)})
}

testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
Expand Down Expand Up @@ -1029,6 +1029,68 @@ func TestTransactionFetcherRateLimiting(t *testing.T) {
})
}

// Tests that if huge transactions are announced, only a small number of them will
// be requested at a time, to keep the responses below a resonable level.
func TestTransactionFetcherBandwidthLimiting(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
nil,
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
// Announce mid size transactions from A to verify that multiple
// ones can be piled into a single request.
doTxNotify{peer: "A",
hashes: []common.Hash{{0x01}, {0x02}, {0x03}, {0x04}},
types: []byte{types.LegacyTxType, types.LegacyTxType, types.LegacyTxType, types.LegacyTxType},
sizes: []uint32{48 * 1024, 48 * 1024, 48 * 1024, 48 * 1024},
},
// Announce exactly on the limit transactions to see that only one
// gets requested
doTxNotify{peer: "B",
hashes: []common.Hash{{0x05}, {0x06}},
types: []byte{types.LegacyTxType, types.LegacyTxType},
sizes: []uint32{maxTxRetrievalSize, maxTxRetrievalSize},
},
// Announce oversized blob transactions to see that overflows are ok
doTxNotify{peer: "C",
hashes: []common.Hash{{0x07}, {0x08}},
types: []byte{types.BlobTxType, types.BlobTxType},
sizes: []uint32{params.MaxBlobGasPerBlock, params.MaxBlobGasPerBlock},
},
doWait{time: txArriveTimeout, step: true},
isWaiting(nil),
isScheduledWithMeta{
tracking: map[string][]announce{
"A": {
{common.Hash{0x01}, typeptr(types.LegacyTxType), sizeptr(48 * 1024)},
{common.Hash{0x02}, typeptr(types.LegacyTxType), sizeptr(48 * 1024)},
{common.Hash{0x03}, typeptr(types.LegacyTxType), sizeptr(48 * 1024)},
{common.Hash{0x04}, typeptr(types.LegacyTxType), sizeptr(48 * 1024)},
},
"B": {
{common.Hash{0x05}, typeptr(types.LegacyTxType), sizeptr(maxTxRetrievalSize)},
{common.Hash{0x06}, typeptr(types.LegacyTxType), sizeptr(maxTxRetrievalSize)},
},
"C": {
{common.Hash{0x07}, typeptr(types.BlobTxType), sizeptr(params.MaxBlobGasPerBlock)},
{common.Hash{0x08}, typeptr(types.BlobTxType), sizeptr(params.MaxBlobGasPerBlock)},
},
},
fetching: map[string][]common.Hash{
"A": {{0x02}, {0x03}, {0x04}},
"B": {{0x06}},
"C": {{0x08}},
},
},
},
})
}

// Tests that then number of transactions a peer is allowed to announce and/or
// request at the same time is hard capped.
func TestTransactionFetcherDoSProtection(t *testing.T) {
Expand Down Expand Up @@ -1664,7 +1726,7 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
if (meta == nil && (ann.kind != nil || ann.size != nil)) ||
(meta != nil && (ann.kind == nil || ann.size == nil)) ||
(meta != nil && (meta.kind != *ann.kind || meta.size != *ann.size)) {
t.Errorf("step %d, peer %s, hash %x: waitslot metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, ann.kind, ann.size)
t.Errorf("step %d, peer %s, hash %x: waitslot metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, *ann.kind, *ann.size)
}
}
}
Expand Down Expand Up @@ -1733,7 +1795,7 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
if (meta == nil && (ann.kind != nil || ann.size != nil)) ||
(meta != nil && (ann.kind == nil || ann.size == nil)) ||
(meta != nil && (meta.kind != *ann.kind || meta.size != *ann.size)) {
t.Errorf("step %d, peer %s, hash %x: announce metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, ann.kind, ann.size)
t.Errorf("step %d, peer %s, hash %x: announce metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, *ann.kind, *ann.size)
}
}
}
Expand Down