Skip to content

Commit

Permalink
fetch: requests got lost when there are no peers (#4849)
Browse files Browse the repository at this point in the history
## Motivation
requests are not restored back to the queue when there were no peers to dispatch to.

## Changes
- restore requests when there are no peers
- push mesh to still make progress even when there are missing blocks
- increase frequency of state sync

## testing
i manually test against mainnet, artificially causing GetPeers() to return zero peers.
mesh was making very slow progress because it aborts as soon as it finds a missing block, not executing the next block even when it's available.
  • Loading branch information
countvonzero committed Aug 16, 2023
1 parent e0575cb commit 52710cc
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 61 deletions.
19 changes: 16 additions & 3 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,6 @@ func (f *Fetch) send(requests []RequestMessage) {
}

peer2batches := f.organizeRequests(requests)

for peer, peerBatches := range peer2batches {
for _, reqs := range peerBatches {
batch := &batchInfo{
Expand All @@ -457,8 +456,22 @@ func (f *Fetch) organizeRequests(requests []RequestMessage) map[p2p.Peer][][]Req
peer2requests := make(map[p2p.Peer][]RequestMessage)
peers := f.host.GetPeers()
if len(peers) == 0 {
f.logger.Info("cannot send fetch: no peers found")
// in loop() we will try again after the batchTimeout
f.logger.Info("cannot send batch: no peers found")
f.mu.Lock()
defer f.mu.Unlock()
errNoPeer := errors.New("no peers")
for _, msg := range requests {
if req, ok := f.ongoing[msg.Hash]; ok {
req.promise.err = errNoPeer
close(req.promise.completed)
delete(f.ongoing, req.hash)
} else {
f.logger.With().Error("ongoing request missing",
log.Stringer("hash", msg.Hash),
log.String("hint", string(msg.Hint)),
)
}
}
return nil
}

Expand Down
47 changes: 35 additions & 12 deletions mesh/mesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ func (msh *Mesh) ProcessLayer(ctx context.Context, lid types.LayerID) error {
msh.pendingUpdates.min = types.MinLayer(msh.pendingUpdates.min, results[0].Layer)
msh.pendingUpdates.max = types.MaxLayer(msh.pendingUpdates.max, results[len(results)-1].Layer)
}
if next := msh.LatestLayerInState() + 1; next < msh.pendingUpdates.min {
next := msh.LatestLayerInState() + 1
if next < msh.pendingUpdates.min {
msh.pendingUpdates.min = next
pending = true
}
Expand All @@ -318,34 +319,55 @@ func (msh *Mesh) ProcessLayer(ctx context.Context, lid types.LayerID) error {
})),
)
}
if missing := missingBlocks(results); len(missing) > 0 {
applicable, missing := filterMissing(results, next)
if len(missing) > 0 {
select {
case <-ctx.Done():
case msh.missingBlocks <- missing:
}
return fmt.Errorf("%w: request missing blocks %v", ErrMissingBlock, missing)
if len(applicable) == 0 {
return fmt.Errorf("%w: request missing blocks %v", ErrMissingBlock, missing)
}
}
if err := msh.ensureStateConsistent(ctx, results); err != nil {

if err := msh.ensureStateConsistent(ctx, applicable); err != nil {
return err
}
if err := msh.applyResults(ctx, results); err != nil {
if err := msh.applyResults(ctx, applicable); err != nil {
return err
}
msh.pendingUpdates.min = 0
msh.pendingUpdates.max = 0
if len(missing) > 0 {
msh.pendingUpdates.min = msh.LatestLayerInState()
} else {
msh.pendingUpdates.min = 0
msh.pendingUpdates.max = 0
}
return nil
}

func missingBlocks(results []result.Layer) []types.BlockID {
var response []types.BlockID
for _, layer := range results {
func filterMissing(results []result.Layer, next types.LayerID) ([]result.Layer, []types.BlockID) {
var (
missing []types.BlockID
index = -1
)
for i, layer := range results {
for _, block := range layer.Blocks {
if (block.Valid || block.Hare || block.Local) && !block.Data {
response = append(response, block.Header.ID)
missing = append(missing, block.Header.ID)
if index == -1 {
index = i
}
}
}
}
return response
if index >= 0 {
firstMissing := results[index].Layer
if firstMissing <= next {
return nil, missing
}
return results[:index], missing
}
return results, nil
}

func (msh *Mesh) applyResults(ctx context.Context, results []result.Layer) error {
Expand Down Expand Up @@ -409,6 +431,7 @@ func (msh *Mesh) applyResults(ctx context.Context, results []result.Layer) error

msh.logger.With().Debug("state persisted",
log.Context(ctx),
log.Stringer("layer", layer.Layer),
log.Stringer("applied", target),
)
if layer.Layer > msh.LatestLayerInState() {
Expand Down
113 changes: 68 additions & 45 deletions mesh/mesh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,8 @@ func TestProcessLayer(t *testing.T) {

// outputs
err string
executed []types.BlockID
applied []types.BlockID
executed map[types.LayerID]types.BlockID
applied map[types.LayerID]types.BlockID
validity map[types.BlockID]bool
}
type testCase struct {
Expand All @@ -395,15 +395,38 @@ func TestProcessLayer(t *testing.T) {
rblock(idg("1"), fixture.Good()),
rblock(idg("2"), fixture.Data(), fixture.Invalid())),
),
executed: []types.BlockID{idg("1")},
applied: []types.BlockID{idg("1")},
executed: map[types.LayerID]types.BlockID{start: idg("1")},
applied: map[types.LayerID]types.BlockID{start: idg("1")},
validity: map[types.BlockID]bool{
idg("1"): true,
idg("2"): false,
},
},
},
},
{
"missing but can make progress",
[]call{
{
updates: rlayers(
rlayer(start, rblock(idg("1"), fixture.Valid(), fixture.Data())),
rlayer(start+1, rblock(idg("2"), fixture.Valid(), fixture.Data())),
rlayer(start+2, rblock(idg("3"), fixture.Valid())),
rlayer(start+3, rblock(idg("4"), fixture.Valid(), fixture.Data())),
),
executed: map[types.LayerID]types.BlockID{start: idg("1"), start + 1: idg("2")},
applied: map[types.LayerID]types.BlockID{start: idg("1"), start + 1: idg("2")},
},
{
results: rlayers(
rlayer(start+2, rblock(idg("3"), fixture.Valid(), fixture.Data())),
rlayer(start+3, rblock(idg("4"), fixture.Valid(), fixture.Data())),
),
executed: map[types.LayerID]types.BlockID{start + 2: idg("3"), start + 3: idg("4")},
applied: map[types.LayerID]types.BlockID{start + 2: idg("3"), start + 3: idg("4")},
},
},
},
{
"missing valid",
[]call{
Expand All @@ -417,8 +440,8 @@ func TestProcessLayer(t *testing.T) {
results: rlayers(
rlayer(start, rblock(idg("1"), fixture.Data(), fixture.Valid())),
),
executed: []types.BlockID{idg("1")},
applied: []types.BlockID{idg("1")},
executed: map[types.LayerID]types.BlockID{start: idg("1")},
applied: map[types.LayerID]types.BlockID{start: idg("1")},
validity: map[types.BlockID]bool{
idg("1"): true,
},
Expand All @@ -438,8 +461,8 @@ func TestProcessLayer(t *testing.T) {
results: rlayers(
rlayer(start, rblock(idg("1"), fixture.Invalid())),
),
executed: []types.BlockID{{}},
applied: []types.BlockID{{}},
executed: map[types.LayerID]types.BlockID{start: {}},
applied: map[types.LayerID]types.BlockID{start: {}},
},
},
},
Expand All @@ -450,15 +473,15 @@ func TestProcessLayer(t *testing.T) {
updates: rlayers(
rlayer(start),
),
executed: []types.BlockID{{}},
applied: []types.BlockID{{0}},
executed: map[types.LayerID]types.BlockID{start: {}},
applied: map[types.LayerID]types.BlockID{start: {0}},
},
{
updates: []result.Layer{
rlayer(start, rblock(idg("2"), fixture.Valid(), fixture.Data())),
},
executed: []types.BlockID{idg("2")},
applied: []types.BlockID{idg("2")},
executed: map[types.LayerID]types.BlockID{start: idg("2")},
applied: map[types.LayerID]types.BlockID{start: idg("2")},
},
},
},
Expand All @@ -469,14 +492,14 @@ func TestProcessLayer(t *testing.T) {
updates: rlayers(
rlayer(start, rblock(idg("1"), fixture.Hare(), fixture.Data())),
),
executed: []types.BlockID{idg("1")},
applied: []types.BlockID{idg("1")},
executed: map[types.LayerID]types.BlockID{start: idg("1")},
applied: map[types.LayerID]types.BlockID{start: idg("1")},
},
{
updates: []result.Layer{
rlayer(start, rblock(idg("1"), fixture.Hare(), fixture.Data(), fixture.Valid())),
},
applied: []types.BlockID{idg("1")},
applied: map[types.LayerID]types.BlockID{start: idg("1")},
},
},
},
Expand All @@ -487,15 +510,15 @@ func TestProcessLayer(t *testing.T) {
updates: rlayers(
rlayer(start, rblock(idg("1"), fixture.Hare(), fixture.Data())),
),
executed: []types.BlockID{idg("1")},
applied: []types.BlockID{idg("1")},
executed: map[types.LayerID]types.BlockID{start: idg("1")},
applied: map[types.LayerID]types.BlockID{start: idg("1")},
},
{
updates: rlayers(
rlayer(start.Add(1), rblock(idg("2"), fixture.Hare(), fixture.Data())),
),
executed: []types.BlockID{idg("2")},
applied: []types.BlockID{idg("1"), idg("2")},
executed: map[types.LayerID]types.BlockID{start: idg("2")},
applied: map[types.LayerID]types.BlockID{start: idg("1"), start + 1: idg("2")},
},
},
},
Expand All @@ -506,15 +529,15 @@ func TestProcessLayer(t *testing.T) {
updates: rlayers(
rlayer(start, rblock(idg("1"), fixture.Valid(), fixture.Data())),
),
executed: []types.BlockID{idg("1")},
applied: []types.BlockID{idg("1")},
executed: map[types.LayerID]types.BlockID{start: idg("1")},
applied: map[types.LayerID]types.BlockID{start: idg("1")},
},
{
updates: rlayers(
rlayer(start, rblock(idg("1"), fixture.Invalid(), fixture.Hare(), fixture.Data())),
),
executed: []types.BlockID{{0}},
applied: []types.BlockID{{0}},
executed: map[types.LayerID]types.BlockID{start: {0}},
applied: map[types.LayerID]types.BlockID{start: {0}},
},
},
},
Expand All @@ -526,16 +549,16 @@ func TestProcessLayer(t *testing.T) {
rlayer(start, rblock(idg("1"), fixture.Valid(), fixture.Data())),
rlayer(start+1, rblock(idg("2"), fixture.Valid(), fixture.Data())),
),
executed: []types.BlockID{idg("1"), idg("2")},
applied: []types.BlockID{idg("1"), idg("2")},
executed: map[types.LayerID]types.BlockID{start: idg("1"), start + 1: idg("2")},
applied: map[types.LayerID]types.BlockID{start: idg("1"), start + 1: idg("2")},
},
{
updates: rlayers(
rlayer(start, rblock(idg("1"), fixture.Invalid(), fixture.Data())),
rlayer(start+1, rblock(idg("2"), fixture.Valid(), fixture.Data())),
),
executed: []types.BlockID{types.EmptyBlockID, idg("2")},
applied: []types.BlockID{types.EmptyBlockID, idg("2")},
executed: map[types.LayerID]types.BlockID{start: types.EmptyBlockID, start + 1: idg("2")},
applied: map[types.LayerID]types.BlockID{start: types.EmptyBlockID, start + 1: idg("2")},
},
},
},
Expand All @@ -547,8 +570,8 @@ func TestProcessLayer(t *testing.T) {
rlayer(start, rblock(idg("1"), fixture.Valid(), fixture.Data())),
rlayer(start+1, rblock(idg("2"), fixture.Valid(), fixture.Data())),
),
executed: []types.BlockID{idg("1"), idg("2")},
applied: []types.BlockID{idg("1"), idg("2")},
executed: map[types.LayerID]types.BlockID{start: idg("1"), start + 1: idg("2")},
applied: map[types.LayerID]types.BlockID{start: idg("1"), start + 1: idg("2")},
},
{
updates: rlayers(
Expand All @@ -570,8 +593,8 @@ func TestProcessLayer(t *testing.T) {
rlayer(start+1,
rblock(idg("2"), fixture.Valid(), fixture.Data())),
),
executed: []types.BlockID{idg("3"), idg("2")},
applied: []types.BlockID{idg("3"), idg("2")},
executed: map[types.LayerID]types.BlockID{start: idg("3"), start + 1: idg("2")},
applied: map[types.LayerID]types.BlockID{start: idg("3"), start + 1: idg("2")},
},
},
},
Expand All @@ -592,8 +615,8 @@ func TestProcessLayer(t *testing.T) {
rlayer(start, rblock(idg("1"), fixture.Valid(), fixture.Data())),
rlayer(start+1, rblock(idg("2"), fixture.Valid(), fixture.Data())),
),
executed: []types.BlockID{idg("1"), idg("2")},
applied: []types.BlockID{idg("1"), idg("2")},
executed: map[types.LayerID]types.BlockID{start: idg("1"), start + 1: idg("2")},
applied: map[types.LayerID]types.BlockID{start: idg("1"), start + 1: idg("2")},
},
},
},
Expand All @@ -609,8 +632,8 @@ func TestProcessLayer(t *testing.T) {
updates: rlayers(
fixture.RLayer(start),
),
executed: []types.BlockID{{}},
applied: []types.BlockID{{}},
executed: map[types.LayerID]types.BlockID{start: {}},
applied: map[types.LayerID]types.BlockID{start: {}},
},
},
},
Expand All @@ -631,8 +654,8 @@ func TestProcessLayer(t *testing.T) {
fixture.RBlock(fixture.IDGen("2"), fixture.Valid(), fixture.Data()),
),
),
executed: []types.BlockID{idg("1"), idg("2")},
applied: []types.BlockID{idg("1"), idg("2")},
executed: map[types.LayerID]types.BlockID{start: idg("1"), start + 1: idg("2")},
applied: map[types.LayerID]types.BlockID{start: idg("1"), start + 1: idg("2")},
},
},
},
Expand All @@ -648,24 +671,24 @@ func TestProcessLayer(t *testing.T) {
rblock(fixture.IDGen("2"), fixture.Invalid(), fixture.Data()),
),
),
executed: []types.BlockID{{}, {}},
applied: []types.BlockID{{}, {}},
executed: map[types.LayerID]types.BlockID{start: {}, start + 1: {}},
applied: map[types.LayerID]types.BlockID{start: {}, start + 1: {}},
},
{
updates: rlayers(
rlayer(start,
rblock(fixture.IDGen("1"), fixture.Invalid(), fixture.Data()),
)),
applied: []types.BlockID{{}, {}},
applied: map[types.LayerID]types.BlockID{start: {}, start + 1: {}},
},
{
updates: rlayers(
rlayer(start.Add(1),
rblock(fixture.IDGen("2"), fixture.Valid(), fixture.Data()),
),
),
executed: []types.BlockID{idg("2")},
applied: []types.BlockID{{}, idg("2")},
executed: map[types.LayerID]types.BlockID{start: idg("2")},
applied: map[types.LayerID]types.BlockID{start: {}, start + 1: idg("2")},
},
},
},
Expand Down Expand Up @@ -698,10 +721,10 @@ func TestProcessLayer(t *testing.T) {
} else {
require.NoError(t, err)
}
for i := range c.applied {
applied, err := layers.GetApplied(tm.cdb, start.Add(uint32(i)))
for lid, bid := range c.applied {
applied, err := layers.GetApplied(tm.cdb, lid)
require.NoError(t, err)
require.Equal(t, c.applied[i], applied)
require.Equal(t, bid, applied)
}
for bid, valid := range c.validity {
stored, err := blocks.IsValid(tm.cdb, bid)
Expand Down
2 changes: 1 addition & 1 deletion syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func NewSyncer(
}

s.syncTimer = time.NewTicker(s.cfg.Interval)
s.validateTimer = time.NewTicker(s.cfg.Interval * 2)
s.validateTimer = time.NewTicker(s.cfg.Interval)
if s.dataFetcher == nil {
s.dataFetcher = NewDataFetch(mesh, fetcher, cdb, cache, s.logger)
}
Expand Down

0 comments on commit 52710cc

Please sign in to comment.