Skip to content

Commit

Permalink
fetch: check if hash peer is connected before using it (#4939)
Browse files Browse the repository at this point in the history
## Motivation
Closes #4938
  • Loading branch information
countvonzero committed Aug 31, 2023
1 parent 1f35f08 commit cedb7cc
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 32 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ to set lower expected latency in the network, eventually reducing layer time.
* [#4879](https://github.com/spacemeshos/go-spacemesh/pull/4795) Makes majority calculation weighted for optimistic filtering.
The network will start using the new algorithm at layer 18_000 (2023-09-14 20:00:00 +0000 UTC)
* [#4923](https://github.com/spacemeshos/go-spacemesh/pull/4923) Faster ballot eligibility validation. Improves sync speed.
* [#4939](https://github.com/spacemeshos/go-spacemesh/pull/4939) Make sure to fetch data from peers that are already connected.

## v1.1.2

Expand Down
22 changes: 7 additions & 15 deletions fetch/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"

lru "github.com/hashicorp/golang-lru/v2"
"golang.org/x/exp/maps"

"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
Expand Down Expand Up @@ -71,24 +72,15 @@ func (hpc *HashPeersCache) Add(hash types.Hash32, peer p2p.Peer) {
hpc.add(hash, peer)
}

// GetRandom returns a random peer for a given hash.
func (hpc *HashPeersCache) GetRandom(hash types.Hash32, hint datastore.Hint, rng *rand.Rand) (p2p.Peer, bool) {
// GetRandom returns a randomized list of peers for a given hash.
func (hpc *HashPeersCache) GetRandom(hash types.Hash32, hint datastore.Hint, rng *rand.Rand) []p2p.Peer {
hpc.mu.Lock()
defer hpc.mu.Unlock()

hashPeersMap, exists := hpc.getWithStats(hash, hint)
if !exists {
return p2p.NoPeer, false
}
n := rng.Intn(len(hashPeersMap)) + 1
i := 0
for peer := range hashPeersMap {
i++
if i == n {
return peer, true
}
}
return p2p.NoPeer, false
pm, _ := hpc.getWithStats(hash, hint)
peers := maps.Keys(pm)
rng.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] })
return peers
}

// RegisterPeerHashes registers provided peer for a list of hashes.
Expand Down
22 changes: 13 additions & 9 deletions fetch/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ func TestAdd(t *testing.T) {

func TestGetRandom(t *testing.T) {
t.Parallel()
t.Run("no hash peers", func(t *testing.T) {
cache := NewHashPeersCache(10)
hash := types.RandomHash()
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
peers := cache.GetRandom(hash, datastore.TXDB, rng)
require.Empty(t, peers)
})
t.Run("1Hash3Peers", func(t *testing.T) {
cache := NewHashPeersCache(10)
hash := types.RandomHash()
Expand All @@ -94,9 +101,8 @@ func TestGetRandom(t *testing.T) {
}()
wg.Wait()
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
peer, exists := cache.GetRandom(hash, datastore.TXDB, rng)
require.Equal(t, true, exists)
require.Contains(t, []p2p.Peer{peer1, peer2, peer3}, peer)
peers := cache.GetRandom(hash, datastore.TXDB, rng)
require.ElementsMatch(t, []p2p.Peer{peer1, peer2, peer3}, peers)
})
t.Run("2Hashes1Peer", func(t *testing.T) {
cache := NewHashPeersCache(10)
Expand All @@ -115,12 +121,10 @@ func TestGetRandom(t *testing.T) {
}()
wg.Wait()
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
randomPeer, exists := cache.GetRandom(hash1, datastore.TXDB, rng)
require.Equal(t, true, exists)
require.Equal(t, peer, randomPeer)
randomPeer, exists = cache.GetRandom(hash2, datastore.TXDB, rng)
require.Equal(t, true, exists)
require.Equal(t, peer, randomPeer)
randomPeers := cache.GetRandom(hash1, datastore.TXDB, rng)
require.Equal(t, []p2p.Peer{peer}, randomPeers)
randomPeers = cache.GetRandom(hash2, datastore.TXDB, rng)
require.Equal(t, []p2p.Peer{peer}, randomPeers)
})
}

Expand Down
21 changes: 13 additions & 8 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,18 +482,23 @@ func (f *Fetch) organizeRequests(requests []RequestMessage) map[p2p.Peer][][]Req
}
return nil
}

for _, req := range requests {
p, exists := f.hashToPeers.GetRandom(req.Hash, req.Hint, rng)
if !exists {
p = randomPeer(peers)
target := p2p.NoPeer
hashPeers := f.hashToPeers.GetRandom(req.Hash, req.Hint, rng)
for _, p := range hashPeers {
if f.host.Connected(p) {
target = p
break
}
}

_, ok := peer2requests[p]
if target == p2p.NoPeer {
target = randomPeer(peers)
}
_, ok := peer2requests[target]
if !ok {
peer2requests[p] = []RequestMessage{req}
peer2requests[target] = []RequestMessage{req}
} else {
peer2requests[p] = append(peer2requests[p], req)
peer2requests[target] = append(peer2requests[target], req)
}
}

Expand Down
38 changes: 38 additions & 0 deletions fetch/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,43 @@ func TestFetch_GetHash(t *testing.T) {
require.NotEqual(t, p1.completed, p2.completed)
}

func TestFetch_GetHashPeerNotConnected(t *testing.T) {
f := createFetch(t)
f.cfg.MaxRetriesForRequest = 0
f.cfg.MaxRetriesForPeer = 0
peer := p2p.Peer("buddy")
awol := p2p.Peer("notConnected")
f.mh.EXPECT().GetPeers().Return([]p2p.Peer{peer})
f.mh.EXPECT().ID().Return(p2p.Peer("self"))
f.mh.EXPECT().Connected(awol).Return(false)
hsh := types.RandomHash()
f.RegisterPeerHashes(awol, []types.Hash32{hsh})

res := ResponseMessage{
Hash: hsh,
Data: []byte("a"),
}
f.mHashS.EXPECT().Request(gomock.Any(), peer, gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, _ p2p.Peer, req []byte, okFunc func([]byte), _ func(error)) error {
var rb RequestBatch
err := codec.Decode(req, &rb)
require.NoError(t, err)
resBatch := ResponseBatch{
ID: rb.ID,
Responses: []ResponseMessage{res},
}
bts, err := codec.Encode(&resBatch)
require.NoError(t, err)
okFunc(bts)
return nil
})

p, err := f.getHash(context.TODO(), hsh, datastore.BlockDB, goodReceiver)
require.NoError(t, err)
f.requestHashBatchFromPeers()
<-p.completed
}

func TestFetch_RequestHashBatchFromPeers(t *testing.T) {
tt := []struct {
name string
Expand Down Expand Up @@ -164,6 +201,7 @@ func TestFetch_RequestHashBatchFromPeers(t *testing.T) {
f.cfg.MaxRetriesForPeer = 0
peer := p2p.Peer("buddy")
f.mh.EXPECT().GetPeers().Return([]p2p.Peer{peer})
f.mh.EXPECT().Connected(peer).Return(true).AnyTimes()

hsh0 := types.RandomHash()
res0 := ResponseMessage{
Expand Down
1 change: 1 addition & 0 deletions fetch/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type meshProvider interface {
type host interface {
ID() p2p.Peer
GetPeers() []p2p.Peer
Connected(p2p.Peer) bool
PeerProtocols(p2p.Peer) ([]protocol.ID, error)
Close() error
}
1 change: 1 addition & 0 deletions fetch/mesh_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func TestFetch_getHashes(t *testing.T) {
f.cfg.BatchSize = 2
f.cfg.MaxRetriesForRequest = 0
f.cfg.MaxRetriesForPeer = 0
f.mh.EXPECT().Connected(gomock.Any()).Return(true).AnyTimes()
peers := []p2p.Peer{p2p.Peer("buddy 0"), p2p.Peer("buddy 1")}
f.mh.EXPECT().GetPeers().Return(peers)
f.mh.EXPECT().ID().Return(p2p.Peer("self")).AnyTimes()
Expand Down
14 changes: 14 additions & 0 deletions fetch/mocks/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions p2p/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ func (fh *Host) GetPeers() []Peer {
return fh.Host.Network().Peers()
}

func (fh *Host) Connected(p Peer) bool {
return fh.Host.Network().Connectedness(p) == network.Connected
}

// ConnectedPeerInfo retrieves a peer info object for the given peer.ID, if the
// given peer is not connected then nil is returned.
func (fh *Host) ConnectedPeerInfo(id peer.ID) *PeerInfo {
Expand Down

0 comments on commit cedb7cc

Please sign in to comment.