Skip to content

Commit

Permalink
fetch: parametrize success rate threshold (#5161)
Browse files Browse the repository at this point in the history
this threshold used to determine if peers are seen as equally responsive within certain threshold.
parametrizing it to run some experiments
  • Loading branch information
dshulyak committed Oct 16, 2023
1 parent 6fde86c commit 816744c
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 8 deletions.
9 changes: 8 additions & 1 deletion fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type Config struct {
BatchSize, QueueSize int
RequestTimeout time.Duration // in seconds
MaxRetriesForRequest int
PeersRateThreshold float64 `mapstructure:"peers-rate-threshold"`
}

// DefaultConfig is the default config for the fetch component.
Expand All @@ -94,6 +95,7 @@ func DefaultConfig() Config {
BatchSize: 20,
RequestTimeout: time.Second * time.Duration(10),
MaxRetriesForRequest: 100,
PeersRateThreshold: 0.02,
}
}

Expand Down Expand Up @@ -174,12 +176,12 @@ func NewFetch(
opts ...Option,
) *Fetch {
bs := datastore.NewBlobStore(cdb.Database)

f := &Fetch{
cfg: DefaultConfig(),
logger: log.NewNop(),
bs: bs,
host: host,
peers: peers.New(),
servers: map[string]requester{},
unprocessed: make(map[types.Hash32]*request),
ongoing: make(map[types.Hash32]*request),
Expand All @@ -189,6 +191,11 @@ func NewFetch(
for _, opt := range opts {
opt(f)
}
popts := []peers.Opt{}
if f.cfg.PeersRateThreshold != 0 {
popts = append(popts, peers.WithRateThreshold(f.cfg.PeersRateThreshold))
}
f.peers = peers.New(popts...)
// NOTE(dshulyak) this is to avoid tests refactoring.
// there is one test that covers this part.
if host != nil {
Expand Down
28 changes: 22 additions & 6 deletions fetch/peers/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ func (p *data) successRate() float64 {
return float64(p.success) / float64(p.success+p.failures)
}

func (p *data) cmp(other *data) int {
func (p *data) cmp(other *data, rateThreshold float64) int {
if p == nil && other != nil {
return -1
}
const rateThreshold = 0.1
switch {
case p.rate-other.rate > rateThreshold:
return 1
Expand All @@ -41,13 +40,30 @@ func (p *data) cmp(other *data) int {
return strings.Compare(string(p.id), string(other.id))
}

func New() *Peers {
return &Peers{peers: map[peer.ID]*data{}}
type Opt func(*Peers)

func WithRateThreshold(rate float64) Opt {
return func(p *Peers) {
p.rateThreshold = rate
}
}

func New(opts ...Opt) *Peers {
p := &Peers{
peers: map[peer.ID]*data{},
rateThreshold: 0.1,
}
for _, opt := range opts {
opt(p)
}
return p
}

type Peers struct {
mu sync.Mutex
peers map[peer.ID]*data

rateThreshold float64
}

func (p *Peers) Add(id peer.ID) {
Expand Down Expand Up @@ -107,7 +123,7 @@ func (p *Peers) SelectBestFrom(peers []peer.ID) peer.ID {
if !exist {
continue
}
if best.cmp(pdata) == -1 {
if best.cmp(pdata, p.rateThreshold) == -1 {
best = pdata
}
}
Expand All @@ -134,7 +150,7 @@ func (p *Peers) SelectBest(n int) []peer.ID {
for _, peer := range p.peers {
worst := peer
for i := range cache {
if cache[i].cmp(worst) == -1 {
if cache[i].cmp(worst, p.rateThreshold) == -1 {
cache[i], worst = worst, cache[i]
}
}
Expand Down
2 changes: 1 addition & 1 deletion fetch/peers/peers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type event struct {
}

func withEvents(events []event) *Peers {
tracker := New()
tracker := New(WithRateThreshold(0.1))
for _, ev := range events {
if ev.delete {
tracker.Delete(ev.id)
Expand Down

0 comments on commit 816744c

Please sign in to comment.