Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth: enforce announcement metadatas and drop peers violating the protocol #28261

Merged
merged 5 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
127 changes: 96 additions & 31 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"errors"
"fmt"
"math"
mrand "math/rand"
"sort"
"time"
Expand Down Expand Up @@ -105,6 +106,14 @@ var (
type txAnnounce struct {
origin string // Identifier of the peer originating the notification
hashes []common.Hash // Batch of transaction hashes being announced
metas []*txMetadata // Batch of metadatas associated with the hashes (nil before eth/68)
}

// txMetadata is a set of extra data transmitted along the announcement for better
// fetch scheduling.
type txMetadata struct {
kind byte // Transaction consensus type
size uint32 // Transaction size in bytes
}

// txRequest represents an in-flight transaction retrieval request destined to
Expand All @@ -120,6 +129,7 @@ type txRequest struct {
type txDelivery struct {
origin string // Identifier of the peer originating the notification
hashes []common.Hash // Batch of transaction hashes having been delivered
metas []txMetadata // Batch of metadatas associated with the delivered hashes
direct bool // Whether this is a direct reply or a broadcast
}

Expand Down Expand Up @@ -155,14 +165,14 @@ type TxFetcher struct {

// Stage 1: Waiting lists for newly discovered transactions that might be
// broadcast without needing explicit request/reply round trips.
waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast
waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist
waitslots map[string]map[common.Hash]struct{} // Waiting announcements grouped by peer (DoS protection)
waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast
waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist
waitslots map[string]map[common.Hash]*txMetadata // Waiting announcements grouped by peer (DoS protection)

// Stage 2: Queue of transactions that waiting to be allocated to some peer
// to be retrieved directly.
announces map[string]map[common.Hash]struct{} // Set of announced transactions, grouped by origin peer
announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash
announces map[string]map[common.Hash]*txMetadata // Set of announced transactions, grouped by origin peer
announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash

// Stage 3: Set of transactions currently being retrieved, some which may be
// fulfilled and some rescheduled. Note, this step shares 'announces' from the
Expand All @@ -175,6 +185,7 @@ type TxFetcher struct {
hasTx func(common.Hash) bool // Retrieves a tx from the local txpool
addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool
fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer
dropPeer func(string) // Drops a peer in case of announcement violation

step chan struct{} // Notification channel when the fetcher loop iterates
clock mclock.Clock // Time wrapper to simulate in tests
Expand All @@ -183,14 +194,14 @@ type TxFetcher struct {

// NewTxFetcher creates a transaction fetcher to retrieve transaction
// based on hash announcements.
func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error) *TxFetcher {
return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, mclock.System{}, nil)
func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string)) *TxFetcher {
return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, dropPeer, mclock.System{}, nil)
}

// NewTxFetcherForTests is a testing method to mock out the realtime clock with
// a simulated version and the internal randomness with a deterministic one.
func NewTxFetcherForTests(
hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error,
hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string),
clock mclock.Clock, rand *mrand.Rand) *TxFetcher {
return &TxFetcher{
notify: make(chan *txAnnounce),
Expand All @@ -199,8 +210,8 @@ func NewTxFetcherForTests(
quit: make(chan struct{}),
waitlist: make(map[common.Hash]map[string]struct{}),
waittime: make(map[common.Hash]mclock.AbsTime),
waitslots: make(map[string]map[common.Hash]struct{}),
announces: make(map[string]map[common.Hash]struct{}),
waitslots: make(map[string]map[common.Hash]*txMetadata),
announces: make(map[string]map[common.Hash]*txMetadata),
announced: make(map[common.Hash]map[string]struct{}),
fetching: make(map[common.Hash]string),
requests: make(map[string]*txRequest),
Expand All @@ -209,14 +220,15 @@ func NewTxFetcherForTests(
hasTx: hasTx,
addTxs: addTxs,
fetchTxs: fetchTxs,
dropPeer: dropPeer,
clock: clock,
rand: rand,
}
}

// Notify announces the fetcher of the potential availability of a new batch of
// transactions in the network.
func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []common.Hash) error {
// Keep track of all the announced transactions
txAnnounceInMeter.Mark(int64(len(hashes)))

Expand All @@ -226,28 +238,35 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
// still valuable to check here because it runs concurrent to the internal
// loop, so anything caught here is time saved internally.
var (
unknowns = make([]common.Hash, 0, len(hashes))
unknownHashes = make([]common.Hash, 0, len(hashes))
unknownMetas = make([]*txMetadata, 0, len(hashes))

duplicate int64
underpriced int64
)
for _, hash := range hashes {
for i, hash := range hashes {
switch {
case f.hasTx(hash):
duplicate++
case f.isKnownUnderpriced(hash):
underpriced++
default:
unknowns = append(unknowns, hash)
unknownHashes = append(unknownHashes, hash)
if types == nil {
unknownMetas = append(unknownMetas, nil)
} else {
unknownMetas = append(unknownMetas, &txMetadata{kind: types[i], size: sizes[i]})
}
}
}
txAnnounceKnownMeter.Mark(duplicate)
txAnnounceUnderpricedMeter.Mark(underpriced)

// If anything's left to announce, push it into the internal loop
if len(unknowns) == 0 {
if len(unknownHashes) == 0 {
return nil
}
announce := &txAnnounce{origin: peer, hashes: unknowns}
announce := &txAnnounce{origin: peer, hashes: unknownHashes, metas: unknownMetas}
select {
case f.notify <- announce:
return nil
Expand Down Expand Up @@ -290,6 +309,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
// re-requesting them and dropping the peer in case of malicious transfers.
var (
added = make([]common.Hash, 0, len(txs))
metas = make([]txMetadata, 0, len(txs))
)
// proceed in batches
for i := 0; i < len(txs); i += 128 {
Expand Down Expand Up @@ -325,6 +345,10 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
otherreject++
}
added = append(added, batch[j].Hash())
metas = append(metas, txMetadata{
kind: batch[j].Type(),
size: uint32(batch[j].Size()),
})
}
knownMeter.Mark(duplicate)
underpricedMeter.Mark(underpriced)
Expand All @@ -337,7 +361,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
}
}
select {
case f.cleanup <- &txDelivery{origin: peer, hashes: added, direct: direct}:
case f.cleanup <- &txDelivery{origin: peer, hashes: added, metas: metas, direct: direct}:
return nil
case <-f.quit:
return errTerminated
Expand Down Expand Up @@ -394,13 +418,15 @@ func (f *TxFetcher) loop() {
want := used + len(ann.hashes)
if want > maxTxAnnounces {
txAnnounceDOSMeter.Mark(int64(want - maxTxAnnounces))

ann.hashes = ann.hashes[:want-maxTxAnnounces]
ann.metas = ann.metas[:want-maxTxAnnounces]
}
// All is well, schedule the remainder of the transactions
idleWait := len(f.waittime) == 0
_, oldPeer := f.announces[ann.origin]

for _, hash := range ann.hashes {
for i, hash := range ann.hashes {
// If the transaction is already downloading, add it to the list
// of possible alternates (in case the current retrieval fails) and
// also account it for the peer.
Expand All @@ -409,9 +435,9 @@ func (f *TxFetcher) loop() {

// Stage 2 and 3 share the set of origins per tx
if announces := f.announces[ann.origin]; announces != nil {
announces[hash] = struct{}{}
announces[hash] = ann.metas[i]
} else {
f.announces[ann.origin] = map[common.Hash]struct{}{hash: {}}
f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
}
continue
}
Expand All @@ -422,22 +448,28 @@ func (f *TxFetcher) loop() {

// Stage 2 and 3 share the set of origins per tx
if announces := f.announces[ann.origin]; announces != nil {
announces[hash] = struct{}{}
announces[hash] = ann.metas[i]
} else {
f.announces[ann.origin] = map[common.Hash]struct{}{hash: {}}
f.announces[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
}
continue
}
// If the transaction is already known to the fetcher, but not
// yet downloading, add the peer as an alternate origin in the
// waiting list.
if f.waitlist[hash] != nil {
// Ignore double announcements from the same peer. This is
// especially important if metadata is also passed along to
// prevent malicious peers flip-flopping good/bad values.
if _, ok := f.waitlist[hash][ann.origin]; ok {
continue
}
Comment on lines +461 to +466
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to double-check the logic here. So, say tx A has size N. Malicious peer A announces it to us, as size M and peer B announces it as size N (correctly).
Now, peer A does not deliver, so we instead ask peer B for it, and peer B delivers a tx with size N.

  • Do we hold both meta's, and thus expect A to have M if coming from A and N if coming from B?
  • If not, how do we prevent an attack where a malicious announcement is used to punish an honest tx-delivery?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And also, would it make sense to somehow flag (warn) if we notice this behaviour: peers announcing different meta for a transaction.
It could point to shenanigans of this sort, and also of implementation flaws.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Answering my own question here:

  • Do we hold both meta's, and thus expect A to have M if coming from A and N if coming from B?

Yes. So attack not possible here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... and furthermore, once the honest peer delivers tx A, the lie from peer A will be discovered and it will be ejected, since we iterate over all announcements in f.announces. Good

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And also, would it make sense to somehow flag (warn) if we notice this behaviour: peers announcing different meta for a transaction.
It could point to shenanigans of this sort, and also of implementation flaws.

In the end, it will turn into a Warn when the bad peer gets dropped. IMO there's no need to find announce clashes at the moment of announcements, when delivery happens we'll know for sure who's right and if anyone's wrong.

I guess what we could do is to add a second warning log where we log that someone was a "bit" off, but not off enough to drop.

f.waitlist[hash][ann.origin] = struct{}{}

if waitslots := f.waitslots[ann.origin]; waitslots != nil {
waitslots[hash] = struct{}{}
waitslots[hash] = ann.metas[i]
} else {
f.waitslots[ann.origin] = map[common.Hash]struct{}{hash: {}}
f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
}
continue
}
Expand All @@ -446,9 +478,9 @@ func (f *TxFetcher) loop() {
f.waittime[hash] = f.clock.Now()

if waitslots := f.waitslots[ann.origin]; waitslots != nil {
waitslots[hash] = struct{}{}
waitslots[hash] = ann.metas[i]
} else {
f.waitslots[ann.origin] = map[common.Hash]struct{}{hash: {}}
f.waitslots[ann.origin] = map[common.Hash]*txMetadata{hash: ann.metas[i]}
}
}
// If a new item was added to the waitlist, schedule it into the fetcher
Expand All @@ -474,9 +506,9 @@ func (f *TxFetcher) loop() {
f.announced[hash] = f.waitlist[hash]
for peer := range f.waitlist[hash] {
if announces := f.announces[peer]; announces != nil {
announces[hash] = struct{}{}
announces[hash] = f.waitslots[peer][hash]
} else {
f.announces[peer] = map[common.Hash]struct{}{hash: {}}
f.announces[peer] = map[common.Hash]*txMetadata{hash: f.waitslots[peer][hash]}
}
delete(f.waitslots[peer], hash)
if len(f.waitslots[peer]) == 0 {
Expand Down Expand Up @@ -545,10 +577,27 @@ func (f *TxFetcher) loop() {

case delivery := <-f.cleanup:
// Independent if the delivery was direct or broadcast, remove all
// traces of the hash from internal trackers
for _, hash := range delivery.hashes {
// traces of the hash from internal trackers. That said, compare any
// advertised metadata with the real ones and drop bad peers.
for i, hash := range delivery.hashes {
if _, ok := f.waitlist[hash]; ok {
for peer, txset := range f.waitslots {
if meta := txset[hash]; meta != nil {
if delivery.metas[i].kind != meta.kind {
log.Warn("Announced transaction type mismatch", "peer", peer, "tx", hash, "type", delivery.metas[i].kind, "ann", meta.kind)
f.dropPeer(peer)
} else if delivery.metas[i].size != meta.size {
log.Warn("Announced transaction size mismatch", "peer", peer, "tx", hash, "size", delivery.metas[i].size, "ann", meta.size)
if math.Abs(float64(delivery.metas[i].size)-float64(meta.size)) > 8 {
// Normally we should drop a peer considering this is a protocol violation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Impressive level of indentation reached here. switch case for if for if if if. I don't have any great suggestions, but if you can make it less that would be supernice

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I won't :D

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it's very hard to follow the code here. Like, you check if _, ok := f.waitlist[hash]; ok {, (but never use whatever object it is inside that waitlist, but if it's present, you check the waitslots, otherwise you operate on the announces.

It might make perfect sense to you who wrote the code, but for a reviewer (and maybe even for yourself two years from now), it's pretty hard to ensure that it does what it should, and that all conditions are correct.

Copy link
Contributor

@holiman holiman Oct 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, couldn't it look something like this? (I mean part of it, there would need to be some more code too)

		case delivery := <-f.cleanup:
			// Independent if the delivery was direct or broadcast, remove all
			// traces of the hash from internal trackers. That said, compare any
			// advertised metadata with the real ones and drop bad peers.
			f.onDeliveries(delivery, func(announced, actual *txMetadata) {
				if announced.kind != actual.kind {
					log.Warn("Announced transaction type mismatch", "peer", peer, "tx", hash, "type", announced.kind, "ann", actual.kind)
					f.dropPeer(peer)
				} else if announced.size != meta.size {
					log.Warn("Announced transaction size mismatch", "peer", peer, "tx", hash, "size", announced.size, "ann", actual.size)
					if math.Abs(float64(announced.size)-float64(actual.size)) > 8 {
						// Normally we should drop a peer considering this is a protocol violation.
						// However, due to the RLP vs consensus format messyness, allow a few bytes
						// wiggle-room where we only warn, but don't drop.
						//
						// TODO(karalabe): Get rid of this relaxation when clients are proven stable.
						f.dropPeer(peer)
					}
				}
			})

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this logic is used twice here wouldn't it make sense to combine it into an anonymous function?

verifyMetadata := func(meta *txMetaData) {
    if meta == nil {
       return
    }
    if delivery.metas[i].kind != meta.kind {
    ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah damn, Martin was quicker :D

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could probably clean the code up, but at this point I need to get this 5 liner change out of the way and focus on the rest of 4844 and don't want to diverge a month into splitting up the fetcher nice and elegantly. Though TBH, apart from introducing 10 more functions, I don't think it's gonna be much more elegant.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but I'll take that "You could probably clean the code up" literally, and after we merge this PR, I want to clean it up. For example like this:

https://github.com/karalabe/go-ethereum/compare/enforce-tx-announcement-metas...holiman:go-ethereum:enforce-tx-announcement-metas-mod?expand=1

Even if it means "introducing 10 more functions", I think 10 separate functions is preferrable over a gigantic switch with hundred-liner cases; it increases both readability and testability.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The catch with a lot of small methods is that people have the tendency to reuse them. Ok, this sounds weird. But the current logic has a very very strict flow. When there are many methods, people tend to maybe turn one or the other into more generic ones, tweak them, reuse them and then the flow can get completely whacked out because someone forgot some ordering constaint. That's usually the reason I do large methods. It's hard to mess it up in the future.

// However, due to the RLP vs consensus format messyness, allow a few bytes
// wiggle-room where we only warn, but don't drop.
//
// TODO(karalabe): Get rid of this relaxation when clients are proven stable.
f.dropPeer(peer)
}
}
}
delete(txset, hash)
if len(txset) == 0 {
delete(f.waitslots, peer)
Expand All @@ -558,6 +607,22 @@ func (f *TxFetcher) loop() {
delete(f.waittime, hash)
} else {
for peer, txset := range f.announces {
if meta := txset[hash]; meta != nil {
if delivery.metas[i].kind != meta.kind {
log.Warn("Announced transaction type mismatch", "peer", peer, "tx", hash, "type", delivery.metas[i].kind, "ann", meta.kind)
f.dropPeer(peer)
} else if delivery.metas[i].size != meta.size {
log.Warn("Announced transaction size mismatch", "peer", peer, "tx", hash, "size", delivery.metas[i].size, "ann", meta.size)
if math.Abs(float64(delivery.metas[i].size)-float64(meta.size)) > 8 {
// Normally we should drop a peer considering this is a protocol violation.
// However, due to the RLP vs consensus format messyness, allow a few bytes
// wiggle-room where we only warn, but don't drop.
//
// TODO(karalabe): Get rid of this relaxation when clients are proven stable.
f.dropPeer(peer)
}
}
}
delete(txset, hash)
if len(txset) == 0 {
delete(f.announces, peer)
Expand Down Expand Up @@ -859,7 +924,7 @@ func (f *TxFetcher) forEachPeer(peers map[string]struct{}, do func(peer string))

// forEachHash does a range loop over a map of hashes in production, but during
// testing it does a deterministic sorted random to allow reproducing issues.
func (f *TxFetcher) forEachHash(hashes map[common.Hash]struct{}, do func(hash common.Hash) bool) {
func (f *TxFetcher) forEachHash(hashes map[common.Hash]*txMetadata, do func(hash common.Hash) bool) {
// If we're running production, use whatever Go's map gives us
if f.rand == nil {
for hash := range hashes {
Expand Down