Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - sync: prioritize peers with higher success rate and low latency #5143

Closed
wants to merge 11 commits into from
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ See [RELEASE](./RELEASE.md) for workflow instructions.

The submit proof of work should now be up to 40% faster thanks to [code optimization](https://github.com/spacemeshos/poet/pull/419).

* [#5143](https://github.com/spacemeshos/go-spacemesh/pull/5143) Select good peers for sync requests.

The change improves initial sync speed and any sync protocol requests required during consensus.

## v1.2.0

### Upgrade information
Expand Down
163 changes: 96 additions & 67 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@
import (
"context"
"errors"
"fmt"
"math/rand"
"sync"
"time"

"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/network"
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
"github.com/spacemeshos/go-spacemesh/fetch/peers"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/p2p/server"
Expand All @@ -30,6 +30,8 @@
OpnProtocol = "lp/2"

cacheSize = 1000

RedundantPeers = 10
)

var (
Expand Down Expand Up @@ -79,7 +81,6 @@
// Config is the configuration file of the Fetch component.
type Config struct {
BatchTimeout time.Duration // in milliseconds
MaxRetriesForPeer int
BatchSize, QueueSize int
RequestTimeout time.Duration // in seconds
MaxRetriesForRequest int
Expand All @@ -89,7 +90,6 @@
func DefaultConfig() Config {
return Config{
BatchTimeout: time.Millisecond * time.Duration(50),
MaxRetriesForPeer: 2,
QueueSize: 20,
BatchSize: 20,
RequestTimeout: time.Second * time.Duration(10),
Expand Down Expand Up @@ -144,6 +144,7 @@
logger log.Log
bs *datastore.BlobStore
host host
peers *peers.Peers

servers map[string]requester
validators *dataValidators
Expand All @@ -165,13 +166,20 @@
}

// NewFetch creates a new Fetch struct.
func NewFetch(cdb *datastore.CachedDB, msh meshProvider, b system.BeaconGetter, host *p2p.Host, opts ...Option) *Fetch {
func NewFetch(
cdb *datastore.CachedDB,
msh meshProvider,
b system.BeaconGetter,
host *p2p.Host,
opts ...Option,
) *Fetch {
bs := datastore.NewBlobStore(cdb.Database)
f := &Fetch{
cfg: DefaultConfig(),
logger: log.NewNop(),
bs: bs,
host: host,
peers: peers.New(),
servers: map[string]requester{},
unprocessed: make(map[types.Hash32]*request),
ongoing: make(map[types.Hash32]*request),
Expand All @@ -181,6 +189,28 @@
for _, opt := range opts {
opt(f)
}
// NOTE(dshulyak) this is to avoid tests refactoring.
// there is one test that covers this part.
if host != nil {
connectedf := func(peer p2p.Peer) {
f.logger.With().Debug("add peer", log.Stringer("id", peer))
f.peers.Add(peer)
}
host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(_ network.Network, c network.Conn) {
connectedf(c.RemotePeer())
},
DisconnectedF: func(_ network.Network, c network.Conn) {
f.logger.With().Debug("remove peer", log.Stringer("id", c.RemotePeer()))
f.peers.Delete(c.RemotePeer())
},
})
for _, peer := range host.GetPeers() {
if host.Connected(peer) {
connectedf(peer)
}
}
}

f.batchTimeout = time.NewTicker(f.cfg.BatchTimeout)
srvOpts := []server.Opt{
Expand All @@ -190,11 +220,23 @@
if len(f.servers) == 0 {
h := newHandler(cdb, bs, msh, b, f.logger)
f.servers[atxProtocol] = server.New(host, atxProtocol, h.handleEpochInfoReq, srvOpts...)
f.servers[lyrDataProtocol] = server.New(host, lyrDataProtocol, h.handleLayerDataReq, srvOpts...)
f.servers[lyrDataProtocol] = server.New(
host,
lyrDataProtocol,
h.handleLayerDataReq,
srvOpts...)
f.servers[hashProtocol] = server.New(host, hashProtocol, h.handleHashReq, srvOpts...)
f.servers[meshHashProtocol] = server.New(host, meshHashProtocol, h.handleMeshHashReq, srvOpts...)
f.servers[meshHashProtocol] = server.New(
host,
meshHashProtocol,
h.handleMeshHashReq,
srvOpts...)
f.servers[malProtocol] = server.New(host, malProtocol, h.handleMaliciousIDsReq, srvOpts...)
f.servers[OpnProtocol] = server.New(host, OpnProtocol, h.handleLayerOpinionsReq2, srvOpts...)
f.servers[OpnProtocol] = server.New(
host,
OpnProtocol,
h.handleLayerOpinionsReq2,
srvOpts...)
}
return f
}
Expand Down Expand Up @@ -255,9 +297,6 @@
f.logger.Info("stopping fetch")
f.batchTimeout.Stop()
f.cancel()
if err := f.host.Close(); err != nil {
f.logger.With().Warning("error closing host", log.Err(err))
}
f.mu.Lock()
for _, req := range f.unprocessed {
close(req.promise.completed)
Expand Down Expand Up @@ -423,7 +462,9 @@
var requestList []RequestMessage
// only send one request per hash
for hash, req := range f.unprocessed {
f.logger.WithContext(req.ctx).With().Debug("processing hash request", log.Stringer("hash", hash))
f.logger.WithContext(req.ctx).
With().
Debug("processing hash request", log.Stringer("hash", hash))
requestList = append(requestList, RequestMessage{Hash: hash, Hint: req.hint})
// move the processed requests to pending
f.ongoing[hash] = req
Expand All @@ -450,16 +491,17 @@
peer: peer,
}
batch.setID()
_ = f.sendBatch(peer, batch)
f.sendBatch(peer, batch)
}
}
}

func (f *Fetch) organizeRequests(requests []RequestMessage) map[p2p.Peer][][]RequestMessage {
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
peer2requests := make(map[p2p.Peer][]RequestMessage)
peers := f.host.GetPeers()
if len(peers) == 0 {

best := f.peers.SelectBest(RedundantPeers)
if len(best) == 0 {
f.logger.Info("cannot send batch: no peers found")
f.mu.Lock()
defer f.mu.Unlock()
Expand All @@ -479,16 +521,10 @@
return nil
}
for _, req := range requests {
target := p2p.NoPeer
hashPeers := f.hashToPeers.GetRandom(req.Hash, req.Hint, rng)
for _, p := range hashPeers {
if f.host.Connected(p) {
target = p
break
}
}
target := f.peers.SelectBestFrom(hashPeers)
if target == p2p.NoPeer {
target = randomPeer(peers)
target = randomPeer(best)
}
_, ok := peer2requests[target]
if !ok {
Expand Down Expand Up @@ -519,51 +555,42 @@
}

// sendBatch dispatches batched request messages to provided peer.
func (f *Fetch) sendBatch(p p2p.Peer, batch *batchInfo) error {
func (f *Fetch) sendBatch(peer p2p.Peer, batch *batchInfo) {
if f.stopped() {
return
}

Check warning on line 561 in fetch/fetch.go

View check run for this annotation

Codecov / codecov/patch

fetch/fetch.go#L560-L561

Added lines #L560 - L561 were not covered by tests
f.mu.Lock()
f.batched[batch.ID] = batch
f.mu.Unlock()

f.logger.With().Debug("sending batch request",
f.logger.With().Debug("sending batched request to peer",
log.Stringer("batch_hash", batch.ID),
log.Stringer("peer", batch.peer))
// timeout function will be called if no response was received for the hashes sent
errorFunc := func(err error) {
log.Int("num_requests", len(batch.Requests)),
log.Stringer("peer", peer),
)
// Request is asynchronous,
// it will return errors only if size of the bytes buffer is large
// or target peer is not connected
start := time.Now()
errf := func(err error) {
f.logger.With().Warning("failed to send batch",
log.Stringer("batch_hash", batch.ID),
log.Err(err))
log.Stringer("batch_hash", peer), log.Err(err),
)
f.peers.OnFailure(peer)
f.handleHashError(batch.ID, err)
}

bytes, err := codec.Encode(&batch.RequestBatch)
err := f.servers[hashProtocol].Request(
f.shutdownCtx,
peer,
codec.MustEncode(&batch.RequestBatch),
func(buf []byte) {
f.peers.OnLatency(peer, time.Since(start))
f.receiveResponse(buf)
},
errf,
)
if err != nil {
f.logger.With().Panic("failed to encode batch", log.Err(err))
}

// try sending batch to provided peer
retries := 0
for {
if f.stopped() {
return nil
}

f.logger.With().Debug("sending batched request to peer",
log.Stringer("batch_hash", batch.ID),
log.Int("num_requests", len(batch.Requests)),
log.Stringer("peer", p))

err = f.servers[hashProtocol].Request(f.shutdownCtx, p, bytes, f.receiveResponse, errorFunc)
if err == nil {
break
}

retries++
if retries > f.cfg.MaxRetriesForPeer {
f.handleHashError(batch.ID, fmt.Errorf("batched request failed w retries: %w", err))
break
}
errf(err)
}
return err
}

// handleHashError is called when an error occurred processing batches of the following hashes.
Expand All @@ -580,7 +607,8 @@
for _, br := range batch.Requests {
req, ok := f.ongoing[br.Hash]
if !ok {
f.logger.With().Warning("hash missing from ongoing requests", log.Stringer("hash", br.Hash))
f.logger.With().
Warning("hash missing from ongoing requests", log.Stringer("hash", br.Hash))

Check warning on line 611 in fetch/fetch.go

View check run for this annotation

Codecov / codecov/patch

fetch/fetch.go#L610-L611

Added lines #L610 - L611 were not covered by tests
continue
}
f.logger.WithContext(req.ctx).With().Warning("hash request failed",
Expand All @@ -596,7 +624,12 @@

// getHash is the regular buffered call to get a specific hash, using provided hash, h as hint the receiving end will
// know where to look for the hash, this function returns HashDataPromiseResult channel that will hold Data received or error.
func (f *Fetch) getHash(ctx context.Context, hash types.Hash32, h datastore.Hint, receiver dataReceiver) (*promise, error) {
func (f *Fetch) getHash(
ctx context.Context,
hash types.Hash32,
h datastore.Hint,
receiver dataReceiver,
) (*promise, error) {
if f.stopped() {
return nil, f.shutdownCtx.Err()
}
Expand Down Expand Up @@ -650,10 +683,6 @@
f.hashToPeers.RegisterPeerHashes(peer, hashes)
}

func (f *Fetch) GetPeers() []p2p.Peer {
return f.host.GetPeers()
}

func (f *Fetch) PeerProtocols(p p2p.Peer) ([]protocol.ID, error) {
return f.host.PeerProtocols(p)
func (f *Fetch) SelectBest(n int) []p2p.Peer {
return f.peers.SelectBest(n)
}