Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/fetcher: allow underpriced transactions in after timeout #28097

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
37 changes: 21 additions & 16 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"sort"
"time"

mapset "github.com/deckarep/golang-set/v2"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/lru"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -53,6 +53,9 @@ const (
// re-request them.
maxTxUnderpricedSetSize = 32768

// maxTxUnderpricedTimeout is the max time a transaction should be stuck in the underpriced set.
maxTxUnderpricedTimeout = int64(5 * time.Minute)

// txArriveTimeout is the time allowance before an announced transaction is
// explicitly requested.
txArriveTimeout = 500 * time.Millisecond
Expand Down Expand Up @@ -148,7 +151,7 @@ type TxFetcher struct {
drop chan *txDrop
quit chan struct{}

underpriced mapset.Set[common.Hash] // Transactions discarded as too cheap (don't re-fetch)
underpriced *lru.Cache[common.Hash, int64] // Transactions discarded as too cheap (don't re-fetch)

// Stage 1: Waiting lists for newly discovered transactions that might be
// broadcast without needing explicit request/reply round trips.
Expand Down Expand Up @@ -202,7 +205,7 @@ func NewTxFetcherForTests(
fetching: make(map[common.Hash]string),
requests: make(map[string]*txRequest),
alternates: make(map[common.Hash]map[string]struct{}),
underpriced: mapset.NewSet[common.Hash](),
underpriced: lru.NewCache[common.Hash, int64](maxTxUnderpricedSetSize),
hasTx: hasTx,
addTxs: addTxs,
fetchTxs: fetchTxs,
Expand All @@ -223,17 +226,16 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
// still valuable to check here because it runs concurrent to the internal
// loop, so anything caught here is time saved internally.
var (
unknowns = make([]common.Hash, 0, len(hashes))
duplicate, underpriced int64
unknowns = make([]common.Hash, 0, len(hashes))
duplicate int64
underpriced int64
)
for _, hash := range hashes {
switch {
case f.hasTx(hash):
duplicate++

case f.underpriced.Contains(hash):
case f.isKnownUnderpriced(hash):
underpriced++

default:
unknowns = append(unknowns, hash)
}
Expand All @@ -245,10 +247,7 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
if len(unknowns) == 0 {
return nil
}
announce := &txAnnounce{
origin: peer,
hashes: unknowns,
}
announce := &txAnnounce{origin: peer, hashes: unknowns}
select {
case f.notify <- announce:
return nil
Expand All @@ -257,6 +256,15 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
}
}

func (f *TxFetcher) isKnownUnderpriced(hash common.Hash) bool {
prevTime, ok := f.underpriced.Peek(hash)
if ok && prevTime+maxTxUnderpricedTimeout < time.Now().Unix() {
f.underpriced.Remove(hash)
return false
}
return ok
}

// Enqueue imports a batch of received transaction into the transaction pool
// and the fetcher. This method may be called by both transaction broadcasts and
// direct request replies. The differentiation is important so the fetcher can
Expand Down Expand Up @@ -300,10 +308,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
// Avoid re-request this transaction when we receive another
// announcement.
if errors.Is(err, txpool.ErrUnderpriced) || errors.Is(err, txpool.ErrReplaceUnderpriced) {
for f.underpriced.Cardinality() >= maxTxUnderpricedSetSize {
f.underpriced.Pop()
}
f.underpriced.Add(batch[j].Hash())
f.underpriced.Add(batch[j].Hash(), batch[j].Time().Unix())
}
// Track a few interesting failure types
switch {
Expand Down
4 changes: 2 additions & 2 deletions eth/fetcher/tx_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1509,8 +1509,8 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
}

case isUnderpriced:
if fetcher.underpriced.Cardinality() != int(step) {
t.Errorf("step %d: underpriced set size mismatch: have %d, want %d", i, fetcher.underpriced.Cardinality(), step)
if fetcher.underpriced.Len() != int(step) {
t.Errorf("step %d: underpriced set size mismatch: have %d, want %d", i, fetcher.underpriced.Len(), step)
}

default:
Expand Down